This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch zero-copy-no-batching
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 24b59da1923e5243617c638b5ef0224f4d9097d3
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Sun Mar 23 23:07:31 2025 +0100

    quic finished
---
 sdk/src/error.rs               |  2 +-
 server/src/http/messages.rs    |  1 -
 server/src/quic/listener.rs    | 79 +++++++++++++++++++++++++-----------------
 server/src/quic/quic_sender.rs | 47 ++++++++++++++++++++++---
 4 files changed, 92 insertions(+), 37 deletions(-)

diff --git a/sdk/src/error.rs b/sdk/src/error.rs
index 9f7f24d5..94426947 100644
--- a/sdk/src/error.rs
+++ b/sdk/src/error.rs
@@ -3,7 +3,7 @@ use crate::utils::topic_size::MaxTopicSize;
 use strum::{EnumDiscriminants, FromRepr, IntoStaticStr};
 use thiserror::Error;
 
-#[derive(Debug, Error, EnumDiscriminants, IntoStaticStr, FromRepr)]
+#[derive(Clone, Debug, Error, EnumDiscriminants, IntoStaticStr, FromRepr)]
 #[repr(u32)]
 #[strum(serialize_all = "snake_case")]
 #[strum_discriminants(
diff --git a/server/src/http/messages.rs b/server/src/http/messages.rs
index 70e4a84e..89a01557 100644
--- a/server/src/http/messages.rs
+++ b/server/src/http/messages.rs
@@ -5,7 +5,6 @@ use crate::http::COMPONENT;
 use crate::streaming::segments::{IggyIndexesMut, IggyMessagesBatchMut};
 use crate::streaming::session::Session;
 use crate::streaming::systems::messages::PollingArgs;
-use crate::streaming::utils::random_id;
 use axum::extract::{Path, Query, State};
 use axum::http::StatusCode;
 use axum::routing::get;
diff --git a/server/src/quic/listener.rs b/server/src/quic/listener.rs
index b1abec11..a6395315 100644
--- a/server/src/quic/listener.rs
+++ b/server/src/quic/listener.rs
@@ -1,13 +1,10 @@
-use crate::binary::command;
+use crate::binary::command::{ServerCommand, ServerCommandHandler};
 use crate::binary::sender::SenderKind;
 use crate::server_error::ConnectionError;
 use crate::streaming::clients::client_manager::Transport;
 use crate::streaming::session::Session;
 use crate::streaming::systems::system::SharedSystem;
-use anyhow::{anyhow, Context};
-use bytes::Bytes;
-use iggy::validatable::Validatable;
-use iggy::{bytes_serializable::BytesSerializable, messages::MAX_PAYLOAD_SIZE};
+use anyhow::anyhow;
 use quinn::{Connection, Endpoint, RecvStream, SendStream};
 use tracing::{debug, error, info};
 
@@ -99,40 +96,60 @@ async fn handle_stream(
     system: SharedSystem,
     session: impl AsRef<Session>,
 ) -> anyhow::Result<()> {
-    // TODO: Fix me
-    /*
     let (send_stream, mut recv_stream) = stream;
-    // TODO: read to BytesMut instead of Vec<u8>
-    let request = recv_stream
-        .read_to_end(MAX_PAYLOAD_SIZE as usize)
-        .await
-        .with_context(|| "Error when reading the QUIC request.")?;
 
-    if request.len() < INITIAL_BYTES_LENGTH {
+    let mut length_buffer = [0u8; INITIAL_BYTES_LENGTH];
+    let mut code_buffer = [0u8; INITIAL_BYTES_LENGTH];
+
+    let length_len = match recv_stream.read(&mut length_buffer).await? {
+        Some(read_length) => read_length,
+        None => return Ok(()),
+    };
+
+    if length_len != INITIAL_BYTES_LENGTH {
         return Err(anyhow!(
-            "Unable to read the QUIC request length, expected: 
{INITIAL_BYTES_LENGTH} bytes, received: {} bytes.",
-            request.len()
+            "Unable to read the QUIC request length, expected: 
{INITIAL_BYTES_LENGTH} bytes, received: {length_len} bytes.",
         ));
     }
 
-    debug!("Trying to read command...");
-    let length = request[..INITIAL_BYTES_LENGTH]
-        .try_into()
-        .map(u32::from_le_bytes)
-        .unwrap_or_default();
-    let command =
-        
ServerCommand::from_bytes(Bytes::copy_from_slice(&request[INITIAL_BYTES_LENGTH..]))
-            .with_context(|| "Error when reading the QUIC request command.")?;
-    command
-        .validate()
-        .with_context(|| "Error when validating the QUIC command.")?;
+    let code_len = match recv_stream.read(&mut code_buffer).await? {
+        Some(read_length) => read_length,
+        None => return Err(anyhow!("Connection closed before reading command 
code")),
+    };
 
-    debug!("Received a QUIC command: {command}, payload size: {length}");
+    if code_len != INITIAL_BYTES_LENGTH {
+        return Err(anyhow!(
+            "Unable to read the QUIC request code, expected: 
{INITIAL_BYTES_LENGTH} bytes, received: {code_len} bytes.",
+        ));
+    }
+
+    let length = u32::from_le_bytes(length_buffer);
+    let code = u32::from_le_bytes(code_buffer);
+
+    debug!("Received a QUIC request, length: {length}, code: {code}");
 
     let mut sender = SenderKind::get_quic_sender(send_stream, recv_stream);
-    command::handle(command, &mut sender, session.as_ref(), system.clone())
+
+    let command = match ServerCommand::from_code_and_reader(code, &mut sender, 
length - 4).await {
+        Ok(cmd) => cmd,
+        Err(e) => {
+            sender.send_error_response(e.clone()).await?;
+            return Err(anyhow!("Failed to parse command: {e}"));
+        }
+    };
+
+    if let Err(e) = command.validate() {
+        sender.send_error_response(e.clone()).await?;
+        return Err(anyhow!("Command validation failed: {e}"));
+    }
+
+    debug!("Received a QUIC command: {command}, payload size: {length}");
+
+    match command
+        .handle(&mut sender, length, session.as_ref(), &system)
         .await
-        .with_context(|| "Error when handling the QUIC request.")
-        */
-    todo!()
+    {
+        Ok(_) => Ok(()),
+        Err(e) => Err(anyhow!("Error handling command: {e}")),
+    }
 }
diff --git a/server/src/quic/quic_sender.rs b/server/src/quic/quic_sender.rs
index c80c124c..6fa7990e 100644
--- a/server/src/quic/quic_sender.rs
+++ b/server/src/quic/quic_sender.rs
@@ -1,10 +1,9 @@
-use std::io::IoSlice;
-
 use crate::quic::COMPONENT;
 use crate::{binary::sender::Sender, server_error::ServerError};
 use error_set::ErrContext;
 use iggy::error::IggyError;
 use quinn::{RecvStream, SendStream};
+use std::io::IoSlice;
 use tracing::{debug, error};
 
 const STATUS_OK: &[u8] = &[0; 4];
@@ -47,8 +46,48 @@ impl Sender for QuicSender {
         length: &[u8],
         slices: Vec<IoSlice<'_>>,
     ) -> Result<(), IggyError> {
-        //TODO: Fix me, quinn seems to have support for write vectored.
-        todo!()
+        debug!("Sending vectored response with status: {:?}...", STATUS_OK);
+
+        let headers = [STATUS_OK, length].concat();
+        self.send
+            .write_all(&headers)
+            .await
+            .with_error_context(|error| {
+                format!("{COMPONENT} (error: {error}) - failed to write 
headers to stream")
+            })
+            .map_err(|_| IggyError::QuicError)?;
+
+        let mut total_bytes_written = 0;
+
+        for slice in slices {
+            let slice_data = &*slice;
+            if !slice_data.is_empty() {
+                self.send
+                    .write_all(slice_data)
+                    .await
+                    .with_error_context(|error| {
+                        format!("{COMPONENT} (error: {error}) - failed to 
write slice to stream")
+                    })
+                    .map_err(|_| IggyError::QuicError)?;
+
+                total_bytes_written += slice_data.len();
+            }
+        }
+
+        debug!(
+            "Sent vectored response: {} bytes of payload",
+            total_bytes_written
+        );
+
+        self.send
+            .finish()
+            .with_error_context(|error| {
+                format!("{COMPONENT} (error: {error}) - failed to finish send 
stream")
+            })
+            .map_err(|_| IggyError::QuicError)?;
+
+        debug!("Sent vectored response with status: {:?}", STATUS_OK);
+        Ok(())
     }
 }
 

Reply via email to