This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch zero-copy-no-batching
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit fecd2ed3732513a88d0c2a7e0319780cf3cbce6c
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Mon Mar 24 01:38:00 2025 +0100

    quic and http fixes
---
 Cargo.lock                                         | 13 ++++
 Cargo.toml                                         |  2 +-
 bench/src/benchmarks/benchmark.rs                  |  2 +-
 configs/server.toml                                |  6 +-
 sdk/src/http/messages.rs                           |  9 ++-
 sdk/src/messages/polled_messages.rs                |  4 -
 sdk/src/messages/send_messages.rs                  |  2 +-
 sdk/src/models/messaging/indexes.rs                |  5 ++
 sdk/src/models/messaging/messages_batch.rs         |  6 +-
 sdk/src/prelude.rs                                 |  6 +-
 server/src/http/messages.rs                        |  2 +-
 server/src/quic/listener.rs                        | 37 +++-------
 server/src/quic/quic_sender.rs                     |  6 +-
 .../streaming/segments/types/message_view_mut.rs   | 14 +---
 server/src/tcp/connection_handler.rs               |  2 +-
 server/src/tcp/sender.rs                           |  3 +-
 tools/src/data-seeder/seeder.rs                    | 85 +++++++++++++++++-----
 17 files changed, 125 insertions(+), 79 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index a1bb2766..c8540e0d 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -5376,6 +5376,19 @@ dependencies = [
  "tracing",
 ]
 
+[[package]]
+name = "tools"
+version = "0.1.0"
+dependencies = [
+ "anyhow",
+ "clap",
+ "iggy",
+ "rand 0.9.0",
+ "tokio",
+ "tracing",
+ "tracing-subscriber",
+]
+
 [[package]]
 name = "tower"
 version = "0.4.13"
diff --git a/Cargo.toml b/Cargo.toml
index 788acfcc..4556b6af 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -13,7 +13,7 @@ members = [
     #"integration",
     "sdk",
     "server",
-    # "tools"
+    "tools"
 ]
 
 [workspace.metadata.cargo-machete]
diff --git a/bench/src/benchmarks/benchmark.rs 
b/bench/src/benchmarks/benchmark.rs
index cc2f8fd4..acfb441b 100644
--- a/bench/src/benchmarks/benchmark.rs
+++ b/bench/src/benchmarks/benchmark.rs
@@ -51,7 +51,7 @@ impl From<IggyBenchArgs> for Box<dyn Benchmarkable> {
             BenchmarkKindCommand::EndToEndProducingConsumerGroup(_) => 
Box::new(
                 EndToEndProducingConsumerGroupBenchmark::new(Arc::new(args), 
client_factory),
             ),
-            _ => todo!(),
+            _ => unreachable!(),
         }
     }
 }
diff --git a/configs/server.toml b/configs/server.toml
index e94ca224..5214b5eb 100644
--- a/configs/server.toml
+++ b/configs/server.toml
@@ -444,14 +444,14 @@ enforce_fsync = false
 # `false` skips these checks for faster loading at the risk of undetected 
corruption.
 validate_checksum = false
 
-# The threshold of buffered messages before triggering a save to disk 
(integer).
+# The count threshold of buffered messages before triggering a save to disk 
(integer).
 # Specifies how many messages accumulate before persisting to storage.
 # Adjusting this can balance between write performance and data durability.
 # This is soft limit, actual number of messages may be higher, depending on 
last batch size.
 # Together with `size_of_messages_required_to_save` it defines the threshold 
of buffered messages.
-messages_required_to_save = 10000
+messages_required_to_save = 1000
 
-# The threshold of buffered messages size before triggering a save to disk 
(string).
+# The size threshold of buffered messages before triggering a save to disk 
(string).
 # Specifies how much size of messages accumulate before persisting to storage.
 # Adjusting this can balance between write performance and data durability.
 # This is soft limit, actual number of messages may be higher, depending on 
last batch size.
diff --git a/sdk/src/http/messages.rs b/sdk/src/http/messages.rs
index bf42f604..8d90b759 100644
--- a/sdk/src/http/messages.rs
+++ b/sdk/src/http/messages.rs
@@ -5,6 +5,7 @@ use crate::http::client::HttpClient;
 use crate::http::HttpTransport;
 use crate::identifier::Identifier;
 use crate::messages::{Partitioning, PollingStrategy};
+use crate::models::messaging::IggyMessagesBatch;
 use crate::prelude::{FlushUnsavedBuffer, IggyMessage, PollMessages, 
PolledMessages, SendMessages};
 use async_trait::async_trait;
 
@@ -48,9 +49,15 @@ impl MessageClient for HttpClient {
         partitioning: &Partitioning,
         messages: &mut [IggyMessage],
     ) -> Result<(), IggyError> {
+        let batch = IggyMessagesBatch::from_messages_vec(messages);
         self.post(
             &get_path(&stream_id.as_cow_str(), &topic_id.as_cow_str()),
-            &SendMessages::as_bytes(stream_id, topic_id, partitioning, 
messages),
+            &SendMessages {
+                stream_id: stream_id.clone(),
+                topic_id: topic_id.clone(),
+                partitioning: partitioning.clone(),
+                batch,
+            },
         )
         .await?;
         Ok(())
diff --git a/sdk/src/messages/polled_messages.rs 
b/sdk/src/messages/polled_messages.rs
index 3f4e929d..7cd6e351 100644
--- a/sdk/src/messages/polled_messages.rs
+++ b/sdk/src/messages/polled_messages.rs
@@ -24,10 +24,6 @@ pub struct PolledMessages {
 }
 
 impl PolledMessages {
-    pub fn as_bytes(self) -> Bytes {
-        todo!()
-    }
-
     pub fn empty() -> Self {
         Self {
             partition_id: 0,
diff --git a/sdk/src/messages/send_messages.rs 
b/sdk/src/messages/send_messages.rs
index 8ceee3b5..6633f48a 100644
--- a/sdk/src/messages/send_messages.rs
+++ b/sdk/src/messages/send_messages.rs
@@ -17,7 +17,7 @@ use std::fmt::Display;
 /// - `stream_id` - unique stream ID (numeric or name).
 /// - `topic_id` - unique topic ID (numeric or name).
 /// - `partitioning` - to which partition the messages should be sent - either 
provided by the client or calculated by the server.
-/// - `messages` - collection of messages to be sent using zero-copy message 
views.
+/// - `batch` - collection of messages to be sent.
 #[derive(Debug, Serialize, Deserialize, PartialEq)]
 pub struct SendMessages {
     /// Unique stream ID (numeric or name).
diff --git a/sdk/src/models/messaging/indexes.rs 
b/sdk/src/models/messaging/indexes.rs
index 95eda691..500b1d2f 100644
--- a/sdk/src/models/messaging/indexes.rs
+++ b/sdk/src/models/messaging/indexes.rs
@@ -1,13 +1,18 @@
 use super::{index_view::IggyIndexView, INDEX_SIZE};
 use bytes::Bytes;
 use serde::{Deserialize, Serialize};
+use serde_with::base64::Base64;
+use serde_with::serde_as;
 use std::ops::{Deref, Index as StdIndex};
 
 /// A container for binary-encoded index data.
 /// Optimized for efficient storage and I/O operations.
+#[serde_as]
 #[derive(Default, Debug, Clone, Serialize, Deserialize, PartialEq)]
 pub struct IggyIndexes {
+    #[serde(skip)]
     base_position: u32,
+    #[serde_as(as = "Base64")]
     buffer: Bytes,
 }
 
diff --git a/sdk/src/models/messaging/messages_batch.rs 
b/sdk/src/models/messaging/messages_batch.rs
index 7bddf2cd..8b40791b 100644
--- a/sdk/src/models/messaging/messages_batch.rs
+++ b/sdk/src/models/messaging/messages_batch.rs
@@ -6,18 +6,23 @@ use crate::{
 };
 use bytes::{BufMut, Bytes, BytesMut};
 use serde::{Deserialize, Serialize};
+use serde_with::base64::Base64;
+use serde_with::serde_as;
 use std::ops::{Deref, Index};
 
 /// An immutable messages container that holds a buffer of messages
+#[serde_as]
 #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
 pub struct IggyMessagesBatch {
     /// The number of messages in the batch
+    #[serde(skip)]
     count: u32,
     /// The byte-indexes of messages in the buffer, represented as array of 
u32's
     /// Offsets are relative
     /// Each index position points to the END position of a message (start of 
next message)
     indexes: IggyIndexes,
     /// The buffer containing the messages
+    #[serde_as(as = "Base64")]
     messages: Bytes,
 }
 
@@ -305,7 +310,6 @@ impl IggyMessagesBatch {
     ///
     /// This should be used only for testing purposes!
     /// TODO(hubcio): maybe it can be removed
-    #[cfg(test)]
     pub fn from_messages_vec(messages: &[crate::prelude::IggyMessage]) -> Self 
{
         use crate::prelude::BytesSerializable;
         use bytes::{BufMut, BytesMut};
diff --git a/sdk/src/prelude.rs b/sdk/src/prelude.rs
index ae2ff5a3..5f9fba05 100644
--- a/sdk/src/prelude.rs
+++ b/sdk/src/prelude.rs
@@ -12,7 +12,6 @@
 // TODO(hubcio): finish this
 
 pub use crate::bytes_serializable::BytesSerializable;
-pub use crate::client::Client;
 pub use crate::error::IggyError;
 pub use crate::identifier::Identifier;
 pub use crate::messages::{
@@ -39,3 +38,8 @@ pub use crate::utils::sizeable::Sizeable;
 pub use crate::utils::timestamp::IggyTimestamp;
 
 pub use crate::validatable::Validatable;
+
+pub use crate::client::{Client, MessageClient, StreamClient, TopicClient};
+pub use crate::clients::client::IggyClient;
+pub use crate::utils::topic_size::MaxTopicSize;
+
diff --git a/server/src/http/messages.rs b/server/src/http/messages.rs
index 89a01557..970ce2f2 100644
--- a/server/src/http/messages.rs
+++ b/server/src/http/messages.rs
@@ -79,7 +79,7 @@ async fn send_messages(
     //         msg.id = random_id::get_uuid();
     //     }
     // });
-    command.validate()?;
+    // command.validate()?;
 
     let batch = make_mutable(command.batch);
     let command_stream_id = command.stream_id;
diff --git a/server/src/quic/listener.rs b/server/src/quic/listener.rs
index a6395315..61035f66 100644
--- a/server/src/quic/listener.rs
+++ b/server/src/quic/listener.rs
@@ -6,7 +6,7 @@ use crate::streaming::session::Session;
 use crate::streaming::systems::system::SharedSystem;
 use anyhow::anyhow;
 use quinn::{Connection, Endpoint, RecvStream, SendStream};
-use tracing::{debug, error, info};
+use tracing::{debug, error, info, trace};
 
 const LISTENERS_COUNT: u32 = 10;
 const INITIAL_BYTES_LENGTH: usize = 4;
@@ -101,32 +101,13 @@ async fn handle_stream(
     let mut length_buffer = [0u8; INITIAL_BYTES_LENGTH];
     let mut code_buffer = [0u8; INITIAL_BYTES_LENGTH];
 
-    let length_len = match recv_stream.read(&mut length_buffer).await? {
-        Some(read_length) => read_length,
-        None => return Ok(()),
-    };
-
-    if length_len != INITIAL_BYTES_LENGTH {
-        return Err(anyhow!(
-            "Unable to read the QUIC request length, expected: 
{INITIAL_BYTES_LENGTH} bytes, received: {length_len} bytes.",
-        ));
-    }
-
-    let code_len = match recv_stream.read(&mut code_buffer).await? {
-        Some(read_length) => read_length,
-        None => return Err(anyhow!("Connection closed before reading command 
code")),
-    };
-
-    if code_len != INITIAL_BYTES_LENGTH {
-        return Err(anyhow!(
-            "Unable to read the QUIC request code, expected: 
{INITIAL_BYTES_LENGTH} bytes, received: {code_len} bytes.",
-        ));
-    }
+    recv_stream.read_exact(&mut length_buffer).await?;
+    recv_stream.read_exact(&mut code_buffer).await?;
 
     let length = u32::from_le_bytes(length_buffer);
     let code = u32::from_le_bytes(code_buffer);
 
-    debug!("Received a QUIC request, length: {length}, code: {code}");
+    trace!("Received a QUIC request, length: {length}, code: {code}");
 
     let mut sender = SenderKind::get_quic_sender(send_stream, recv_stream);
 
@@ -138,12 +119,12 @@ async fn handle_stream(
         }
     };
 
-    if let Err(e) = command.validate() {
-        sender.send_error_response(e.clone()).await?;
-        return Err(anyhow!("Command validation failed: {e}"));
-    }
+    // if let Err(e) = command.validate() {
+    //     sender.send_error_response(e.clone()).await?;
+    //     return Err(anyhow!("Command validation failed: {e}"));
+    // }
 
-    debug!("Received a QUIC command: {command}, payload size: {length}");
+    trace!("Received a QUIC command: {command}, payload size: {length}");
 
     match command
         .handle(&mut sender, length, session.as_ref(), &system)
diff --git a/server/src/quic/quic_sender.rs b/server/src/quic/quic_sender.rs
index 6fa7990e..aee3253f 100644
--- a/server/src/quic/quic_sender.rs
+++ b/server/src/quic/quic_sender.rs
@@ -16,12 +16,14 @@ pub struct QuicSender {
 
 impl Sender for QuicSender {
     async fn read(&mut self, buffer: &mut [u8]) -> Result<usize, IggyError> {
-        let read_bytes = self.recv.read(buffer).await.map_err(|error| {
+        // Not-so-nice code because quinn recv stream has different API for 
read_exact
+        let read_bytes = buffer.len();
+        self.recv.read_exact(buffer).await.map_err(|error| {
             error!("Failed to read from the stream: {:?}", error);
             IggyError::QuicError
         })?;
 
-        read_bytes.ok_or(IggyError::QuicError)
+        Ok(read_bytes)
     }
 
     async fn send_empty_ok_response(&mut self) -> Result<(), IggyError> {
diff --git a/server/src/streaming/segments/types/message_view_mut.rs 
b/server/src/streaming/segments/types/message_view_mut.rs
index b3521cf6..efa16dae 100644
--- a/server/src/streaming/segments/types/message_view_mut.rs
+++ b/server/src/streaming/segments/types/message_view_mut.rs
@@ -2,24 +2,18 @@ use super::IggyMessageHeaderViewMut;
 use gxhash::gxhash64;
 use iggy::prelude::*;
 use lending_iterator::prelude::*;
-use std::ops::Range;
 
 /// A mutable view of a message for in-place modifications
 #[derive(Debug)]
 pub struct IggyMessageViewMut<'a> {
     /// The buffer containing the message
     buffer: &'a mut [u8],
-    /// Payload offset
-    payload_offset: usize,
 }
 
 impl<'a> IggyMessageViewMut<'a> {
     /// Create a new mutable message view from a buffer
     pub fn new(buffer: &'a mut [u8]) -> Self {
-        Self {
-            buffer,
-            payload_offset: IGGY_MESSAGE_HEADER_SIZE as usize,
-        }
+        Self { buffer }
     }
 
     /// Get an immutable header view
@@ -41,12 +35,6 @@ impl<'a> IggyMessageViewMut<'a> {
             as usize
     }
 
-    /// Get the byte range this message occupies in the buffer
-    pub fn range(&self) -> Range<usize> {
-        let end = self.payload_offset + self.size();
-        self.payload_offset..end
-    }
-
     /// Convenience to update the checksum field in the header
     pub fn update_checksum(&mut self) {
         let checksum_field_size = size_of::<u64>(); // Skip checksum field for 
checksum calculation
diff --git a/server/src/tcp/connection_handler.rs 
b/server/src/tcp/connection_handler.rs
index 2d99b9be..0e68ce5d 100644
--- a/server/src/tcp/connection_handler.rs
+++ b/server/src/tcp/connection_handler.rs
@@ -41,7 +41,7 @@ pub(crate) async fn handle_connection(
         let length = u32::from_le_bytes(length_buffer);
         sender.read(&mut code_buffer).await?;
         let code = u32::from_le_bytes(code_buffer);
-        debug!("Received a TCP request, length: {length}, code: {code}");
+        tracing::error!("Received a TCP request, length: {length}, code: 
{code}");
         let command = ServerCommand::from_code_and_reader(code, sender, length 
- 4).await?;
         debug!("Received a TCP command: {command}, payload size: {length}");
         command.handle(sender, length, &session, &system).await?;
diff --git a/server/src/tcp/sender.rs b/server/src/tcp/sender.rs
index 68750fe9..41db1aec 100644
--- a/server/src/tcp/sender.rs
+++ b/server/src/tcp/sender.rs
@@ -1,6 +1,5 @@
-use std::io::IoSlice;
-
 use iggy::error::IggyError;
+use std::io::IoSlice;
 use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
 use tracing::debug;
 
diff --git a/tools/src/data-seeder/seeder.rs b/tools/src/data-seeder/seeder.rs
index 45f89e64..42377dad 100644
--- a/tools/src/data-seeder/seeder.rs
+++ b/tools/src/data-seeder/seeder.rs
@@ -1,10 +1,4 @@
-use iggy::client::{MessageClient, StreamClient, TopicClient};
-use iggy::clients::client::IggyClient;
-use iggy::error::IggyError;
-use iggy::messages::send_messages::{Message, Partitioning};
-use iggy::models::header::{HeaderKey, HeaderValue};
-use iggy::utils::expiry::IggyExpiry;
-use iggy::utils::topic_size::MaxTopicSize;
+use iggy::prelude::*;
 use rand::Rng;
 use std::collections::HashMap;
 use std::str::FromStr;
@@ -103,17 +97,41 @@ async fn send_messages(client: &IggyClient) -> Result<(), 
IggyError> {
     let mut rng = rand::rng();
     let streams = [PROD_STREAM_ID, TEST_STREAM_ID, DEV_STREAM_ID];
     let partitioning = Partitioning::balanced();
-    for stream_id in streams {
-        let topics = client.get_topics(&stream_id.try_into()?).await?;
 
-        let stream_id = stream_id.try_into()?;
-        for topic in topics {
+    let message_batches_range = 100..=1000;
+    let messages_per_batch_range = 10..=100;
+
+    let mut total_messages_sent = 0;
+    let mut total_batches_sent = 0;
+
+    for (stream_idx, stream_id) in streams.iter().enumerate() {
+        let stream_id_identifier = (*stream_id).try_into()?;
+        let topics = client.get_topics(&stream_id_identifier).await?;
+        tracing::info!(
+            "Processing stream {} ({}/{})",
+            stream_id,
+            stream_idx + 1,
+            streams.len()
+        );
+
+        for (topic_idx, topic) in topics.iter().enumerate() {
             let topic_id = topic.id.try_into()?;
-            let mut messages = Vec::new();
-            let message_batches = rng.random_range(100..=1000);
+
+            let message_batches = 
rng.random_range(message_batches_range.clone());
+            tracing::info!(
+                "Processing topic {} ({}/{}) - {} batches planned",
+                topic.name,
+                topic_idx + 1,
+                topics.len(),
+                message_batches
+            );
+
             let mut message_id = 1;
-            for _ in 1..=message_batches {
-                let messages_count = rng.random_range(10..=100);
+
+            for batch_idx in 1..=message_batches {
+                let messages_count = 
rng.random_range(messages_per_batch_range.clone());
+                let mut messages = Vec::with_capacity(messages_count);
+
                 for _ in 1..=messages_count {
                     let payload = format!("{}_data_{}", topic.name, 
message_id);
                     let headers = match rng.random_bool(0.5) {
@@ -130,17 +148,46 @@ async fn send_messages(client: &IggyClient) -> Result<(), 
IggyError> {
                             Some(headers)
                         }
                     };
-                    let mut message = Message::from_str(&payload)?;
-                    message.headers = headers;
+
+                    let message = if let Some(headers) = headers {
+                        IggyMessage::builder()
+                            .payload(payload)
+                            .headers(headers)
+                            .build()?
+                    } else {
+                        IggyMessage::builder().payload(payload).build()?
+                    };
+
                     messages.push(message);
                     message_id += 1;
                 }
+
                 client
-                    .send_messages(&stream_id, &topic_id, &partitioning, &mut 
messages)
+                    .send_messages(
+                        &stream_id_identifier,
+                        &topic_id,
+                        &partitioning,
+                        &mut messages,
+                    )
                     .await?;
-                messages = Vec::new();
+
+                total_messages_sent += messages_count;
+                total_batches_sent += 1;
+
+                if batch_idx % 100 == 0 || batch_idx == message_batches {
+                    tracing::info!(
+                        "Sent {}/{} batches ({} messages total)...",
+                        batch_idx,
+                        message_batches,
+                        total_messages_sent
+                    );
+                }
             }
         }
     }
+
+    tracing::info!("Total messages sent: {}", total_messages_sent);
+    tracing::info!("Total batches sent: {}", total_batches_sent);
+
     Ok(())
 }

Reply via email to