This is an automated email from the ASF dual-hosted git repository. hgruszecki pushed a commit to branch zero-copy-no-batching in repository https://gitbox.apache.org/repos/asf/iggy.git
commit fecd2ed3732513a88d0c2a7e0319780cf3cbce6c Author: Hubert Gruszecki <[email protected]> AuthorDate: Mon Mar 24 01:38:00 2025 +0100 quic and http fixes --- Cargo.lock | 13 ++++ Cargo.toml | 2 +- bench/src/benchmarks/benchmark.rs | 2 +- configs/server.toml | 6 +- sdk/src/http/messages.rs | 9 ++- sdk/src/messages/polled_messages.rs | 4 - sdk/src/messages/send_messages.rs | 2 +- sdk/src/models/messaging/indexes.rs | 5 ++ sdk/src/models/messaging/messages_batch.rs | 6 +- sdk/src/prelude.rs | 6 +- server/src/http/messages.rs | 2 +- server/src/quic/listener.rs | 37 +++------- server/src/quic/quic_sender.rs | 6 +- .../streaming/segments/types/message_view_mut.rs | 14 +--- server/src/tcp/connection_handler.rs | 2 +- server/src/tcp/sender.rs | 3 +- tools/src/data-seeder/seeder.rs | 85 +++++++++++++++++----- 17 files changed, 125 insertions(+), 79 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a1bb2766..c8540e0d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5376,6 +5376,19 @@ dependencies = [ "tracing", ] +[[package]] +name = "tools" +version = "0.1.0" +dependencies = [ + "anyhow", + "clap", + "iggy", + "rand 0.9.0", + "tokio", + "tracing", + "tracing-subscriber", +] + [[package]] name = "tower" version = "0.4.13" diff --git a/Cargo.toml b/Cargo.toml index 788acfcc..4556b6af 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,7 +13,7 @@ members = [ #"integration", "sdk", "server", - # "tools" + "tools" ] [workspace.metadata.cargo-machete] diff --git a/bench/src/benchmarks/benchmark.rs b/bench/src/benchmarks/benchmark.rs index cc2f8fd4..acfb441b 100644 --- a/bench/src/benchmarks/benchmark.rs +++ b/bench/src/benchmarks/benchmark.rs @@ -51,7 +51,7 @@ impl From<IggyBenchArgs> for Box<dyn Benchmarkable> { BenchmarkKindCommand::EndToEndProducingConsumerGroup(_) => Box::new( EndToEndProducingConsumerGroupBenchmark::new(Arc::new(args), client_factory), ), - _ => todo!(), + _ => unreachable!(), } } } diff --git a/configs/server.toml b/configs/server.toml index e94ca224..5214b5eb 100644 --- a/configs/server.toml +++ b/configs/server.toml @@ -444,14 +444,14 @@ enforce_fsync = false # `false` skips these checks for faster loading at the risk of undetected corruption. validate_checksum = false -# The threshold of buffered messages before triggering a save to disk (integer). +# The count threshold of buffered messages before triggering a save to disk (integer). # Specifies how many messages accumulate before persisting to storage. # Adjusting this can balance between write performance and data durability. # This is soft limit, actual number of messages may be higher, depending on last batch size. # Together with `size_of_messages_required_to_save` it defines the threshold of buffered messages. -messages_required_to_save = 10000 +messages_required_to_save = 1000 -# The threshold of buffered messages size before triggering a save to disk (string). +# The size threshold of buffered messages before triggering a save to disk (string). # Specifies how much size of messages accumulate before persisting to storage. # Adjusting this can balance between write performance and data durability. # This is soft limit, actual number of messages may be higher, depending on last batch size. diff --git a/sdk/src/http/messages.rs b/sdk/src/http/messages.rs index bf42f604..8d90b759 100644 --- a/sdk/src/http/messages.rs +++ b/sdk/src/http/messages.rs @@ -5,6 +5,7 @@ use crate::http::client::HttpClient; use crate::http::HttpTransport; use crate::identifier::Identifier; use crate::messages::{Partitioning, PollingStrategy}; +use crate::models::messaging::IggyMessagesBatch; use crate::prelude::{FlushUnsavedBuffer, IggyMessage, PollMessages, PolledMessages, SendMessages}; use async_trait::async_trait; @@ -48,9 +49,15 @@ impl MessageClient for HttpClient { partitioning: &Partitioning, messages: &mut [IggyMessage], ) -> Result<(), IggyError> { + let batch = IggyMessagesBatch::from_messages_vec(messages); self.post( &get_path(&stream_id.as_cow_str(), &topic_id.as_cow_str()), - &SendMessages::as_bytes(stream_id, topic_id, partitioning, messages), + &SendMessages { + stream_id: stream_id.clone(), + topic_id: topic_id.clone(), + partitioning: partitioning.clone(), + batch, + }, ) .await?; Ok(()) diff --git a/sdk/src/messages/polled_messages.rs b/sdk/src/messages/polled_messages.rs index 3f4e929d..7cd6e351 100644 --- a/sdk/src/messages/polled_messages.rs +++ b/sdk/src/messages/polled_messages.rs @@ -24,10 +24,6 @@ pub struct PolledMessages { } impl PolledMessages { - pub fn as_bytes(self) -> Bytes { - todo!() - } - pub fn empty() -> Self { Self { partition_id: 0, diff --git a/sdk/src/messages/send_messages.rs b/sdk/src/messages/send_messages.rs index 8ceee3b5..6633f48a 100644 --- a/sdk/src/messages/send_messages.rs +++ b/sdk/src/messages/send_messages.rs @@ -17,7 +17,7 @@ use std::fmt::Display; /// - `stream_id` - unique stream ID (numeric or name). /// - `topic_id` - unique topic ID (numeric or name). /// - `partitioning` - to which partition the messages should be sent - either provided by the client or calculated by the server. -/// - `messages` - collection of messages to be sent using zero-copy message views. +/// - `batch` - collection of messages to be sent. #[derive(Debug, Serialize, Deserialize, PartialEq)] pub struct SendMessages { /// Unique stream ID (numeric or name). diff --git a/sdk/src/models/messaging/indexes.rs b/sdk/src/models/messaging/indexes.rs index 95eda691..500b1d2f 100644 --- a/sdk/src/models/messaging/indexes.rs +++ b/sdk/src/models/messaging/indexes.rs @@ -1,13 +1,18 @@ use super::{index_view::IggyIndexView, INDEX_SIZE}; use bytes::Bytes; use serde::{Deserialize, Serialize}; +use serde_with::base64::Base64; +use serde_with::serde_as; use std::ops::{Deref, Index as StdIndex}; /// A container for binary-encoded index data. /// Optimized for efficient storage and I/O operations. +#[serde_as] #[derive(Default, Debug, Clone, Serialize, Deserialize, PartialEq)] pub struct IggyIndexes { + #[serde(skip)] base_position: u32, + #[serde_as(as = "Base64")] buffer: Bytes, } diff --git a/sdk/src/models/messaging/messages_batch.rs b/sdk/src/models/messaging/messages_batch.rs index 7bddf2cd..8b40791b 100644 --- a/sdk/src/models/messaging/messages_batch.rs +++ b/sdk/src/models/messaging/messages_batch.rs @@ -6,18 +6,23 @@ use crate::{ }; use bytes::{BufMut, Bytes, BytesMut}; use serde::{Deserialize, Serialize}; +use serde_with::base64::Base64; +use serde_with::serde_as; use std::ops::{Deref, Index}; /// An immutable messages container that holds a buffer of messages +#[serde_as] #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] pub struct IggyMessagesBatch { /// The number of messages in the batch + #[serde(skip)] count: u32, /// The byte-indexes of messages in the buffer, represented as array of u32's /// Offsets are relative /// Each index position points to the END position of a message (start of next message) indexes: IggyIndexes, /// The buffer containing the messages + #[serde_as(as = "Base64")] messages: Bytes, } @@ -305,7 +310,6 @@ impl IggyMessagesBatch { /// /// This should be used only for testing purposes! /// TODO(hubcio): maybe it can be removed - #[cfg(test)] pub fn from_messages_vec(messages: &[crate::prelude::IggyMessage]) -> Self { use crate::prelude::BytesSerializable; use bytes::{BufMut, BytesMut}; diff --git a/sdk/src/prelude.rs b/sdk/src/prelude.rs index ae2ff5a3..5f9fba05 100644 --- a/sdk/src/prelude.rs +++ b/sdk/src/prelude.rs @@ -12,7 +12,6 @@ // TODO(hubcio): finish this pub use crate::bytes_serializable::BytesSerializable; -pub use crate::client::Client; pub use crate::error::IggyError; pub use crate::identifier::Identifier; pub use crate::messages::{ @@ -39,3 +38,8 @@ pub use crate::utils::sizeable::Sizeable; pub use crate::utils::timestamp::IggyTimestamp; pub use crate::validatable::Validatable; + +pub use crate::client::{Client, MessageClient, StreamClient, TopicClient}; +pub use crate::clients::client::IggyClient; +pub use crate::utils::topic_size::MaxTopicSize; + diff --git a/server/src/http/messages.rs b/server/src/http/messages.rs index 89a01557..970ce2f2 100644 --- a/server/src/http/messages.rs +++ b/server/src/http/messages.rs @@ -79,7 +79,7 @@ async fn send_messages( // msg.id = random_id::get_uuid(); // } // }); - command.validate()?; + // command.validate()?; let batch = make_mutable(command.batch); let command_stream_id = command.stream_id; diff --git a/server/src/quic/listener.rs b/server/src/quic/listener.rs index a6395315..61035f66 100644 --- a/server/src/quic/listener.rs +++ b/server/src/quic/listener.rs @@ -6,7 +6,7 @@ use crate::streaming::session::Session; use crate::streaming::systems::system::SharedSystem; use anyhow::anyhow; use quinn::{Connection, Endpoint, RecvStream, SendStream}; -use tracing::{debug, error, info}; +use tracing::{debug, error, info, trace}; const LISTENERS_COUNT: u32 = 10; const INITIAL_BYTES_LENGTH: usize = 4; @@ -101,32 +101,13 @@ async fn handle_stream( let mut length_buffer = [0u8; INITIAL_BYTES_LENGTH]; let mut code_buffer = [0u8; INITIAL_BYTES_LENGTH]; - let length_len = match recv_stream.read(&mut length_buffer).await? { - Some(read_length) => read_length, - None => return Ok(()), - }; - - if length_len != INITIAL_BYTES_LENGTH { - return Err(anyhow!( - "Unable to read the QUIC request length, expected: {INITIAL_BYTES_LENGTH} bytes, received: {length_len} bytes.", - )); - } - - let code_len = match recv_stream.read(&mut code_buffer).await? { - Some(read_length) => read_length, - None => return Err(anyhow!("Connection closed before reading command code")), - }; - - if code_len != INITIAL_BYTES_LENGTH { - return Err(anyhow!( - "Unable to read the QUIC request code, expected: {INITIAL_BYTES_LENGTH} bytes, received: {code_len} bytes.", - )); - } + recv_stream.read_exact(&mut length_buffer).await?; + recv_stream.read_exact(&mut code_buffer).await?; let length = u32::from_le_bytes(length_buffer); let code = u32::from_le_bytes(code_buffer); - debug!("Received a QUIC request, length: {length}, code: {code}"); + trace!("Received a QUIC request, length: {length}, code: {code}"); let mut sender = SenderKind::get_quic_sender(send_stream, recv_stream); @@ -138,12 +119,12 @@ async fn handle_stream( } }; - if let Err(e) = command.validate() { - sender.send_error_response(e.clone()).await?; - return Err(anyhow!("Command validation failed: {e}")); - } + // if let Err(e) = command.validate() { + // sender.send_error_response(e.clone()).await?; + // return Err(anyhow!("Command validation failed: {e}")); + // } - debug!("Received a QUIC command: {command}, payload size: {length}"); + trace!("Received a QUIC command: {command}, payload size: {length}"); match command .handle(&mut sender, length, session.as_ref(), &system) diff --git a/server/src/quic/quic_sender.rs b/server/src/quic/quic_sender.rs index 6fa7990e..aee3253f 100644 --- a/server/src/quic/quic_sender.rs +++ b/server/src/quic/quic_sender.rs @@ -16,12 +16,14 @@ pub struct QuicSender { impl Sender for QuicSender { async fn read(&mut self, buffer: &mut [u8]) -> Result<usize, IggyError> { - let read_bytes = self.recv.read(buffer).await.map_err(|error| { + // Not-so-nice code because quinn recv stream has different API for read_exact + let read_bytes = buffer.len(); + self.recv.read_exact(buffer).await.map_err(|error| { error!("Failed to read from the stream: {:?}", error); IggyError::QuicError })?; - read_bytes.ok_or(IggyError::QuicError) + Ok(read_bytes) } async fn send_empty_ok_response(&mut self) -> Result<(), IggyError> { diff --git a/server/src/streaming/segments/types/message_view_mut.rs b/server/src/streaming/segments/types/message_view_mut.rs index b3521cf6..efa16dae 100644 --- a/server/src/streaming/segments/types/message_view_mut.rs +++ b/server/src/streaming/segments/types/message_view_mut.rs @@ -2,24 +2,18 @@ use super::IggyMessageHeaderViewMut; use gxhash::gxhash64; use iggy::prelude::*; use lending_iterator::prelude::*; -use std::ops::Range; /// A mutable view of a message for in-place modifications #[derive(Debug)] pub struct IggyMessageViewMut<'a> { /// The buffer containing the message buffer: &'a mut [u8], - /// Payload offset - payload_offset: usize, } impl<'a> IggyMessageViewMut<'a> { /// Create a new mutable message view from a buffer pub fn new(buffer: &'a mut [u8]) -> Self { - Self { - buffer, - payload_offset: IGGY_MESSAGE_HEADER_SIZE as usize, - } + Self { buffer } } /// Get an immutable header view @@ -41,12 +35,6 @@ impl<'a> IggyMessageViewMut<'a> { as usize } - /// Get the byte range this message occupies in the buffer - pub fn range(&self) -> Range<usize> { - let end = self.payload_offset + self.size(); - self.payload_offset..end - } - /// Convenience to update the checksum field in the header pub fn update_checksum(&mut self) { let checksum_field_size = size_of::<u64>(); // Skip checksum field for checksum calculation diff --git a/server/src/tcp/connection_handler.rs b/server/src/tcp/connection_handler.rs index 2d99b9be..0e68ce5d 100644 --- a/server/src/tcp/connection_handler.rs +++ b/server/src/tcp/connection_handler.rs @@ -41,7 +41,7 @@ pub(crate) async fn handle_connection( let length = u32::from_le_bytes(length_buffer); sender.read(&mut code_buffer).await?; let code = u32::from_le_bytes(code_buffer); - debug!("Received a TCP request, length: {length}, code: {code}"); + tracing::error!("Received a TCP request, length: {length}, code: {code}"); let command = ServerCommand::from_code_and_reader(code, sender, length - 4).await?; debug!("Received a TCP command: {command}, payload size: {length}"); command.handle(sender, length, &session, &system).await?; diff --git a/server/src/tcp/sender.rs b/server/src/tcp/sender.rs index 68750fe9..41db1aec 100644 --- a/server/src/tcp/sender.rs +++ b/server/src/tcp/sender.rs @@ -1,6 +1,5 @@ -use std::io::IoSlice; - use iggy::error::IggyError; +use std::io::IoSlice; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; use tracing::debug; diff --git a/tools/src/data-seeder/seeder.rs b/tools/src/data-seeder/seeder.rs index 45f89e64..42377dad 100644 --- a/tools/src/data-seeder/seeder.rs +++ b/tools/src/data-seeder/seeder.rs @@ -1,10 +1,4 @@ -use iggy::client::{MessageClient, StreamClient, TopicClient}; -use iggy::clients::client::IggyClient; -use iggy::error::IggyError; -use iggy::messages::send_messages::{Message, Partitioning}; -use iggy::models::header::{HeaderKey, HeaderValue}; -use iggy::utils::expiry::IggyExpiry; -use iggy::utils::topic_size::MaxTopicSize; +use iggy::prelude::*; use rand::Rng; use std::collections::HashMap; use std::str::FromStr; @@ -103,17 +97,41 @@ async fn send_messages(client: &IggyClient) -> Result<(), IggyError> { let mut rng = rand::rng(); let streams = [PROD_STREAM_ID, TEST_STREAM_ID, DEV_STREAM_ID]; let partitioning = Partitioning::balanced(); - for stream_id in streams { - let topics = client.get_topics(&stream_id.try_into()?).await?; - let stream_id = stream_id.try_into()?; - for topic in topics { + let message_batches_range = 100..=1000; + let messages_per_batch_range = 10..=100; + + let mut total_messages_sent = 0; + let mut total_batches_sent = 0; + + for (stream_idx, stream_id) in streams.iter().enumerate() { + let stream_id_identifier = (*stream_id).try_into()?; + let topics = client.get_topics(&stream_id_identifier).await?; + tracing::info!( + "Processing stream {} ({}/{})", + stream_id, + stream_idx + 1, + streams.len() + ); + + for (topic_idx, topic) in topics.iter().enumerate() { let topic_id = topic.id.try_into()?; - let mut messages = Vec::new(); - let message_batches = rng.random_range(100..=1000); + + let message_batches = rng.random_range(message_batches_range.clone()); + tracing::info!( + "Processing topic {} ({}/{}) - {} batches planned", + topic.name, + topic_idx + 1, + topics.len(), + message_batches + ); + let mut message_id = 1; - for _ in 1..=message_batches { - let messages_count = rng.random_range(10..=100); + + for batch_idx in 1..=message_batches { + let messages_count = rng.random_range(messages_per_batch_range.clone()); + let mut messages = Vec::with_capacity(messages_count); + for _ in 1..=messages_count { let payload = format!("{}_data_{}", topic.name, message_id); let headers = match rng.random_bool(0.5) { @@ -130,17 +148,46 @@ async fn send_messages(client: &IggyClient) -> Result<(), IggyError> { Some(headers) } }; - let mut message = Message::from_str(&payload)?; - message.headers = headers; + + let message = if let Some(headers) = headers { + IggyMessage::builder() + .payload(payload) + .headers(headers) + .build()? + } else { + IggyMessage::builder().payload(payload).build()? + }; + messages.push(message); message_id += 1; } + client - .send_messages(&stream_id, &topic_id, &partitioning, &mut messages) + .send_messages( + &stream_id_identifier, + &topic_id, + &partitioning, + &mut messages, + ) .await?; - messages = Vec::new(); + + total_messages_sent += messages_count; + total_batches_sent += 1; + + if batch_idx % 100 == 0 || batch_idx == message_batches { + tracing::info!( + "Sent {}/{} batches ({} messages total)...", + batch_idx, + message_batches, + total_messages_sent + ); + } } } } + + tracing::info!("Total messages sent: {}", total_messages_sent); + tracing::info!("Total batches sent: {}", total_batches_sent); + Ok(()) }
