This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch zero-copy-no-batching
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 9f9c1209953ba059f7fbef94cea2904f33454874
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Mon Mar 24 02:20:58 2025 +0100

    new errors
---
 sdk/src/error.rs                           |  6 ++++
 sdk/src/messages/send_messages.rs          | 33 ++----------------
 sdk/src/models/messaging/messages_batch.rs | 55 +++++++++++++++++++++++++-----
 server/src/http/messages.rs                |  2 +-
 server/src/tcp/connection_handler.rs       |  2 +-
 5 files changed, 58 insertions(+), 40 deletions(-)

diff --git a/sdk/src/error.rs b/sdk/src/error.rs
index 94426947..42e3be0e 100644
--- a/sdk/src/error.rs
+++ b/sdk/src/error.rs
@@ -328,6 +328,12 @@ pub enum IggyError {
     InvalidKeyValueLength = 4028,
     #[error("Command length error: {0}")]
     CommandLengthError(String) = 4029,
+    #[error("Non-zero offset: {0} at index: {1}")]
+    NonZeroOffset(u64, u32) = 4030,
+    #[error("Non-zero timestamp: {0} at index: {1}")]
+    NonZeroTimestamp(u64, u32) = 4031,
+    #[error("Missing index: {0}")]
+    MissingIndex(u32) = 4032,
     #[error("Cannot sed messages due to client disconnection")]
     CannotSendMessagesDueToClientDisconnection = 4050,
     #[error("Invalid offset: {0}")]
diff --git a/sdk/src/messages/send_messages.rs 
b/sdk/src/messages/send_messages.rs
index 6633f48a..eacae3d5 100644
--- a/sdk/src/messages/send_messages.rs
+++ b/sdk/src/messages/send_messages.rs
@@ -68,10 +68,10 @@ impl SendMessages {
         bytes.put_bytes(0, indexes_size as usize);
 
         let mut msgs_size: u32 = 0;
-        for (cnt, message) in messages.iter().enumerate() {
+        for message in messages.iter() {
             message.write_to_buffer(&mut bytes);
             msgs_size += message.get_size_bytes().as_bytes_u64() as u32;
-            write_value_at(&mut bytes, (cnt as u32).to_le_bytes(), 
current_position);
+            write_value_at(&mut bytes, 0u64.to_le_bytes(), current_position);
             write_value_at(&mut bytes, msgs_size.to_le_bytes(), 
current_position + 4);
             write_value_at(&mut bytes, 0u64.to_le_bytes(), current_position + 
8);
             current_position += INDEX_SIZE;
@@ -116,10 +116,6 @@ impl Command for SendMessages {
 
 impl Validatable<IggyError> for SendMessages {
     fn validate(&self) -> Result<(), IggyError> {
-        if self.batch.is_empty() {
-            return Err(IggyError::InvalidMessagesCount);
-        }
-
         if self.partitioning.value.len() > 255
             || (self.partitioning.kind != PartitioningKind::Balanced
                 && self.partitioning.value.is_empty())
@@ -127,30 +123,7 @@ impl Validatable<IggyError> for SendMessages {
             return Err(IggyError::InvalidKeyValueLength);
         }
 
-        let mut payload_size = 0;
-        let mut message_count = 0;
-
-        for message in IggyMessageViewIterator::new(&self.batch) {
-            message_count += 1;
-
-            let message_payload = message.payload();
-            payload_size += message_payload.len() as u32;
-            if payload_size > MAX_PAYLOAD_SIZE {
-                return Err(IggyError::TooBigMessagePayload);
-            }
-
-            if message_payload.len() < IGGY_MESSAGE_HEADER_SIZE as usize {
-                return Err(IggyError::InvalidMessagePayloadLength);
-            }
-        }
-
-        if message_count == 0 {
-            return Err(IggyError::InvalidMessagesCount);
-        }
-
-        if payload_size == 0 {
-            return Err(IggyError::EmptyMessagePayload);
-        }
+        self.batch.validate()?;
 
         Ok(())
     }
diff --git a/sdk/src/models/messaging/messages_batch.rs 
b/sdk/src/models/messaging/messages_batch.rs
index 8b40791b..8fb1fa17 100644
--- a/sdk/src/models/messaging/messages_batch.rs
+++ b/sdk/src/models/messaging/messages_batch.rs
@@ -1,8 +1,8 @@
-use super::{IggyIndexes, IggyMessageView, IggyMessageViewIterator};
+use super::{IggyIndexes, IggyMessageView, IggyMessageViewIterator, 
IGGY_MESSAGE_HEADER_SIZE};
 use crate::{
     error::IggyError,
     models::messaging::INDEX_SIZE,
-    prelude::{BytesSerializable, IggyByteSize, Sizeable},
+    prelude::{BytesSerializable, IggyByteSize, Sizeable, Validatable},
 };
 use bytes::{BufMut, Bytes, BytesMut};
 use serde::{Deserialize, Serialize};
@@ -15,7 +15,6 @@ use std::ops::{Deref, Index};
 #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
 pub struct IggyMessagesBatch {
     /// The number of messages in the batch
-    #[serde(skip)]
     count: u32,
     /// The byte-indexes of messages in the buffer, represented as array of 
u32's
     /// Offsets are relative
@@ -349,11 +348,6 @@ impl IggyMessagesBatch {
             messages: messages_buffer.freeze(),
         }
     }
-
-    #[cfg(test)]
-    pub fn get_indexes(&self) -> &crate::models::messaging::IggyIndexes {
-        &self.indexes
-    }
 }
 
 impl Index<usize> for IggyMessagesBatch {
@@ -408,6 +402,51 @@ impl BytesSerializable for IggyMessagesBatch {
     }
 }
 
+impl Validatable<IggyError> for IggyMessagesBatch {
+    fn validate(&self) -> Result<(), IggyError> {
+        // TODO(hubcio): fix validation
+        if self.is_empty() {
+            return Err(IggyError::InvalidMessagesCount);
+        }
+
+        let mut payload_size = 0;
+
+        for i in 0..self.count() {
+            if let Some(index_view) = self.indexes.get(i) {
+                if index_view.offset() != 0 {
+                    tracing::error!("Non-zero offset {} at index: {}", 
index_view.offset(), i);
+                    return Err(IggyError::NonZeroOffset(index_view.offset() as 
u64, i));
+                }
+                if index_view.timestamp() != 0 {
+                    tracing::error!(
+                        "Non-zero timestamp {} at index: {}",
+                        index_view.timestamp(),
+                        i
+                    );
+                    return 
Err(IggyError::NonZeroTimestamp(index_view.timestamp(), i));
+                }
+            } else {
+                tracing::error!("Index {} is missing", i);
+                return Err(IggyError::MissingIndex(i));
+            }
+
+            if let Some(message) = self.get(i as usize) {
+                let message_payload = message.payload();
+                payload_size += message_payload.len() as u32;
+            } else {
+                tracing::error!("Missing index {}", i);
+                return Err(IggyError::MissingIndex(i));
+            }
+        }
+
+        if payload_size == 0 {
+            return Err(IggyError::EmptyMessagePayload);
+        }
+
+        Ok(())
+    }
+}
+
 impl Sizeable for IggyMessagesBatch {
     fn get_size_bytes(&self) -> IggyByteSize {
         IggyByteSize::from(self.messages.len() as u64)
diff --git a/server/src/http/messages.rs b/server/src/http/messages.rs
index 970ce2f2..89a01557 100644
--- a/server/src/http/messages.rs
+++ b/server/src/http/messages.rs
@@ -79,7 +79,7 @@ async fn send_messages(
     //         msg.id = random_id::get_uuid();
     //     }
     // });
-    // command.validate()?;
+    command.validate()?;
 
     let batch = make_mutable(command.batch);
     let command_stream_id = command.stream_id;
diff --git a/server/src/tcp/connection_handler.rs 
b/server/src/tcp/connection_handler.rs
index 0e68ce5d..2d99b9be 100644
--- a/server/src/tcp/connection_handler.rs
+++ b/server/src/tcp/connection_handler.rs
@@ -41,7 +41,7 @@ pub(crate) async fn handle_connection(
         let length = u32::from_le_bytes(length_buffer);
         sender.read(&mut code_buffer).await?;
         let code = u32::from_le_bytes(code_buffer);
-        tracing::error!("Received a TCP request, length: {length}, code: 
{code}");
+        debug!("Received a TCP request, length: {length}, code: {code}");
         let command = ServerCommand::from_code_and_reader(code, sender, length 
- 4).await?;
         debug!("Received a TCP command: {command}, payload size: {length}");
         command.handle(sender, length, &session, &system).await?;

Reply via email to