This is an automated email from the ASF dual-hosted git repository. hgruszecki pushed a commit to branch zero-copy-no-batching in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 9f9c1209953ba059f7fbef94cea2904f33454874 Author: Hubert Gruszecki <[email protected]> AuthorDate: Mon Mar 24 02:20:58 2025 +0100 new errors --- sdk/src/error.rs | 6 ++++ sdk/src/messages/send_messages.rs | 33 ++---------------- sdk/src/models/messaging/messages_batch.rs | 55 +++++++++++++++++++++++++----- server/src/http/messages.rs | 2 +- server/src/tcp/connection_handler.rs | 2 +- 5 files changed, 58 insertions(+), 40 deletions(-) diff --git a/sdk/src/error.rs b/sdk/src/error.rs index 94426947..42e3be0e 100644 --- a/sdk/src/error.rs +++ b/sdk/src/error.rs @@ -328,6 +328,12 @@ pub enum IggyError { InvalidKeyValueLength = 4028, #[error("Command length error: {0}")] CommandLengthError(String) = 4029, + #[error("Non-zero offset: {0} at index: {1}")] + NonZeroOffset(u64, u32) = 4030, + #[error("Non-zero timestamp: {0} at index: {1}")] + NonZeroTimestamp(u64, u32) = 4031, + #[error("Missing index: {0}")] + MissingIndex(u32) = 4032, #[error("Cannot sed messages due to client disconnection")] CannotSendMessagesDueToClientDisconnection = 4050, #[error("Invalid offset: {0}")] diff --git a/sdk/src/messages/send_messages.rs b/sdk/src/messages/send_messages.rs index 6633f48a..eacae3d5 100644 --- a/sdk/src/messages/send_messages.rs +++ b/sdk/src/messages/send_messages.rs @@ -68,10 +68,10 @@ impl SendMessages { bytes.put_bytes(0, indexes_size as usize); let mut msgs_size: u32 = 0; - for (cnt, message) in messages.iter().enumerate() { + for message in messages.iter() { message.write_to_buffer(&mut bytes); msgs_size += message.get_size_bytes().as_bytes_u64() as u32; - write_value_at(&mut bytes, (cnt as u32).to_le_bytes(), current_position); + write_value_at(&mut bytes, 0u64.to_le_bytes(), current_position); write_value_at(&mut bytes, msgs_size.to_le_bytes(), current_position + 4); write_value_at(&mut bytes, 0u64.to_le_bytes(), current_position + 8); current_position += INDEX_SIZE; @@ -116,10 +116,6 @@ impl Command for SendMessages { impl Validatable<IggyError> for SendMessages { fn validate(&self) -> Result<(), IggyError> { - if self.batch.is_empty() { - return Err(IggyError::InvalidMessagesCount); - } - if self.partitioning.value.len() > 255 || (self.partitioning.kind != PartitioningKind::Balanced && self.partitioning.value.is_empty()) @@ -127,30 +123,7 @@ impl Validatable<IggyError> for SendMessages { return Err(IggyError::InvalidKeyValueLength); } - let mut payload_size = 0; - let mut message_count = 0; - - for message in IggyMessageViewIterator::new(&self.batch) { - message_count += 1; - - let message_payload = message.payload(); - payload_size += message_payload.len() as u32; - if payload_size > MAX_PAYLOAD_SIZE { - return Err(IggyError::TooBigMessagePayload); - } - - if message_payload.len() < IGGY_MESSAGE_HEADER_SIZE as usize { - return Err(IggyError::InvalidMessagePayloadLength); - } - } - - if message_count == 0 { - return Err(IggyError::InvalidMessagesCount); - } - - if payload_size == 0 { - return Err(IggyError::EmptyMessagePayload); - } + self.batch.validate()?; Ok(()) } diff --git a/sdk/src/models/messaging/messages_batch.rs b/sdk/src/models/messaging/messages_batch.rs index 8b40791b..8fb1fa17 100644 --- a/sdk/src/models/messaging/messages_batch.rs +++ b/sdk/src/models/messaging/messages_batch.rs @@ -1,8 +1,8 @@ -use super::{IggyIndexes, IggyMessageView, IggyMessageViewIterator}; +use super::{IggyIndexes, IggyMessageView, IggyMessageViewIterator, IGGY_MESSAGE_HEADER_SIZE}; use crate::{ error::IggyError, models::messaging::INDEX_SIZE, - prelude::{BytesSerializable, IggyByteSize, Sizeable}, + prelude::{BytesSerializable, IggyByteSize, Sizeable, Validatable}, }; use bytes::{BufMut, Bytes, BytesMut}; use serde::{Deserialize, Serialize}; @@ -15,7 +15,6 @@ use std::ops::{Deref, Index}; #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] pub struct IggyMessagesBatch { /// The number of messages in the batch - #[serde(skip)] count: u32, /// The byte-indexes of messages in the buffer, represented as array of u32's /// Offsets are relative @@ -349,11 +348,6 @@ impl IggyMessagesBatch { messages: messages_buffer.freeze(), } } - - #[cfg(test)] - pub fn get_indexes(&self) -> &crate::models::messaging::IggyIndexes { - &self.indexes - } } impl Index<usize> for IggyMessagesBatch { @@ -408,6 +402,51 @@ impl BytesSerializable for IggyMessagesBatch { } } +impl Validatable<IggyError> for IggyMessagesBatch { + fn validate(&self) -> Result<(), IggyError> { + // TODO(hubcio): fix validation + if self.is_empty() { + return Err(IggyError::InvalidMessagesCount); + } + + let mut payload_size = 0; + + for i in 0..self.count() { + if let Some(index_view) = self.indexes.get(i) { + if index_view.offset() != 0 { + tracing::error!("Non-zero offset {} at index: {}", index_view.offset(), i); + return Err(IggyError::NonZeroOffset(index_view.offset() as u64, i)); + } + if index_view.timestamp() != 0 { + tracing::error!( + "Non-zero timestamp {} at index: {}", + index_view.timestamp(), + i + ); + return Err(IggyError::NonZeroTimestamp(index_view.timestamp(), i)); + } + } else { + tracing::error!("Index {} is missing", i); + return Err(IggyError::MissingIndex(i)); + } + + if let Some(message) = self.get(i as usize) { + let message_payload = message.payload(); + payload_size += message_payload.len() as u32; + } else { + tracing::error!("Missing index {}", i); + return Err(IggyError::MissingIndex(i)); + } + } + + if payload_size == 0 { + return Err(IggyError::EmptyMessagePayload); + } + + Ok(()) + } +} + impl Sizeable for IggyMessagesBatch { fn get_size_bytes(&self) -> IggyByteSize { IggyByteSize::from(self.messages.len() as u64) diff --git a/server/src/http/messages.rs b/server/src/http/messages.rs index 970ce2f2..89a01557 100644 --- a/server/src/http/messages.rs +++ b/server/src/http/messages.rs @@ -79,7 +79,7 @@ async fn send_messages( // msg.id = random_id::get_uuid(); // } // }); - // command.validate()?; + command.validate()?; let batch = make_mutable(command.batch); let command_stream_id = command.stream_id; diff --git a/server/src/tcp/connection_handler.rs b/server/src/tcp/connection_handler.rs index 0e68ce5d..2d99b9be 100644 --- a/server/src/tcp/connection_handler.rs +++ b/server/src/tcp/connection_handler.rs @@ -41,7 +41,7 @@ pub(crate) async fn handle_connection( let length = u32::from_le_bytes(length_buffer); sender.read(&mut code_buffer).await?; let code = u32::from_le_bytes(code_buffer); - tracing::error!("Received a TCP request, length: {length}, code: {code}"); + debug!("Received a TCP request, length: {length}, code: {code}"); let command = ServerCommand::from_code_and_reader(code, sender, length - 4).await?; debug!("Received a TCP command: {command}, payload size: {length}"); command.handle(sender, length, &session, &system).await?;
