This is an automated email from the ASF dual-hosted git repository. hgruszecki pushed a commit to branch zero-copy-no-batching in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 24b59da1923e5243617c638b5ef0224f4d9097d3 Author: Hubert Gruszecki <[email protected]> AuthorDate: Sun Mar 23 23:07:31 2025 +0100 quic finished --- sdk/src/error.rs | 2 +- server/src/http/messages.rs | 1 - server/src/quic/listener.rs | 79 +++++++++++++++++++++++++----------------- server/src/quic/quic_sender.rs | 47 ++++++++++++++++++++++--- 4 files changed, 92 insertions(+), 37 deletions(-) diff --git a/sdk/src/error.rs b/sdk/src/error.rs index 9f7f24d5..94426947 100644 --- a/sdk/src/error.rs +++ b/sdk/src/error.rs @@ -3,7 +3,7 @@ use crate::utils::topic_size::MaxTopicSize; use strum::{EnumDiscriminants, FromRepr, IntoStaticStr}; use thiserror::Error; -#[derive(Debug, Error, EnumDiscriminants, IntoStaticStr, FromRepr)] +#[derive(Clone, Debug, Error, EnumDiscriminants, IntoStaticStr, FromRepr)] #[repr(u32)] #[strum(serialize_all = "snake_case")] #[strum_discriminants( diff --git a/server/src/http/messages.rs b/server/src/http/messages.rs index 70e4a84e..89a01557 100644 --- a/server/src/http/messages.rs +++ b/server/src/http/messages.rs @@ -5,7 +5,6 @@ use crate::http::COMPONENT; use crate::streaming::segments::{IggyIndexesMut, IggyMessagesBatchMut}; use crate::streaming::session::Session; use crate::streaming::systems::messages::PollingArgs; -use crate::streaming::utils::random_id; use axum::extract::{Path, Query, State}; use axum::http::StatusCode; use axum::routing::get; diff --git a/server/src/quic/listener.rs b/server/src/quic/listener.rs index b1abec11..a6395315 100644 --- a/server/src/quic/listener.rs +++ b/server/src/quic/listener.rs @@ -1,13 +1,10 @@ -use crate::binary::command; +use crate::binary::command::{ServerCommand, ServerCommandHandler}; use crate::binary::sender::SenderKind; use crate::server_error::ConnectionError; use crate::streaming::clients::client_manager::Transport; use crate::streaming::session::Session; use crate::streaming::systems::system::SharedSystem; -use anyhow::{anyhow, Context}; -use bytes::Bytes; -use iggy::validatable::Validatable; -use iggy::{bytes_serializable::BytesSerializable, messages::MAX_PAYLOAD_SIZE}; +use anyhow::anyhow; use quinn::{Connection, Endpoint, RecvStream, SendStream}; use tracing::{debug, error, info}; @@ -99,40 +96,60 @@ async fn handle_stream( system: SharedSystem, session: impl AsRef<Session>, ) -> anyhow::Result<()> { - // TODO: Fix me - /* let (send_stream, mut recv_stream) = stream; - // TODO: read to BytesMut instead of Vec<u8> - let request = recv_stream - .read_to_end(MAX_PAYLOAD_SIZE as usize) - .await - .with_context(|| "Error when reading the QUIC request.")?; - if request.len() < INITIAL_BYTES_LENGTH { + let mut length_buffer = [0u8; INITIAL_BYTES_LENGTH]; + let mut code_buffer = [0u8; INITIAL_BYTES_LENGTH]; + + let length_len = match recv_stream.read(&mut length_buffer).await? { + Some(read_length) => read_length, + None => return Ok(()), + }; + + if length_len != INITIAL_BYTES_LENGTH { return Err(anyhow!( - "Unable to read the QUIC request length, expected: {INITIAL_BYTES_LENGTH} bytes, received: {} bytes.", - request.len() + "Unable to read the QUIC request length, expected: {INITIAL_BYTES_LENGTH} bytes, received: {length_len} bytes.", )); } - debug!("Trying to read command..."); - let length = request[..INITIAL_BYTES_LENGTH] - .try_into() - .map(u32::from_le_bytes) - .unwrap_or_default(); - let command = - ServerCommand::from_bytes(Bytes::copy_from_slice(&request[INITIAL_BYTES_LENGTH..])) - .with_context(|| "Error when reading the QUIC request command.")?; - command - .validate() - .with_context(|| "Error when validating the QUIC command.")?; + let code_len = match recv_stream.read(&mut code_buffer).await? { + Some(read_length) => read_length, + None => return Err(anyhow!("Connection closed before reading command code")), + }; - debug!("Received a QUIC command: {command}, payload size: {length}"); + if code_len != INITIAL_BYTES_LENGTH { + return Err(anyhow!( + "Unable to read the QUIC request code, expected: {INITIAL_BYTES_LENGTH} bytes, received: {code_len} bytes.", + )); + } + + let length = u32::from_le_bytes(length_buffer); + let code = u32::from_le_bytes(code_buffer); + + debug!("Received a QUIC request, length: {length}, code: {code}"); let mut sender = SenderKind::get_quic_sender(send_stream, recv_stream); - command::handle(command, &mut sender, session.as_ref(), system.clone()) + + let command = match ServerCommand::from_code_and_reader(code, &mut sender, length - 4).await { + Ok(cmd) => cmd, + Err(e) => { + sender.send_error_response(e.clone()).await?; + return Err(anyhow!("Failed to parse command: {e}")); + } + }; + + if let Err(e) = command.validate() { + sender.send_error_response(e.clone()).await?; + return Err(anyhow!("Command validation failed: {e}")); + } + + debug!("Received a QUIC command: {command}, payload size: {length}"); + + match command + .handle(&mut sender, length, session.as_ref(), &system) .await - .with_context(|| "Error when handling the QUIC request.") - */ - todo!() + { + Ok(_) => Ok(()), + Err(e) => Err(anyhow!("Error handling command: {e}")), + } } diff --git a/server/src/quic/quic_sender.rs b/server/src/quic/quic_sender.rs index c80c124c..6fa7990e 100644 --- a/server/src/quic/quic_sender.rs +++ b/server/src/quic/quic_sender.rs @@ -1,10 +1,9 @@ -use std::io::IoSlice; - use crate::quic::COMPONENT; use crate::{binary::sender::Sender, server_error::ServerError}; use error_set::ErrContext; use iggy::error::IggyError; use quinn::{RecvStream, SendStream}; +use std::io::IoSlice; use tracing::{debug, error}; const STATUS_OK: &[u8] = &[0; 4]; @@ -47,8 +46,48 @@ impl Sender for QuicSender { length: &[u8], slices: Vec<IoSlice<'_>>, ) -> Result<(), IggyError> { - //TODO: Fix me, quinn seems to have support for write vectored. - todo!() + debug!("Sending vectored response with status: {:?}...", STATUS_OK); + + let headers = [STATUS_OK, length].concat(); + self.send + .write_all(&headers) + .await + .with_error_context(|error| { + format!("{COMPONENT} (error: {error}) - failed to write headers to stream") + }) + .map_err(|_| IggyError::QuicError)?; + + let mut total_bytes_written = 0; + + for slice in slices { + let slice_data = &*slice; + if !slice_data.is_empty() { + self.send + .write_all(slice_data) + .await + .with_error_context(|error| { + format!("{COMPONENT} (error: {error}) - failed to write slice to stream") + }) + .map_err(|_| IggyError::QuicError)?; + + total_bytes_written += slice_data.len(); + } + } + + debug!( + "Sent vectored response: {} bytes of payload", + total_bytes_written + ); + + self.send + .finish() + .with_error_context(|error| { + format!("{COMPONENT} (error: {error}) - failed to finish send stream") + }) + .map_err(|_| IggyError::QuicError)?; + + debug!("Sent vectored response with status: {:?}", STATUS_OK); + Ok(()) } }
