This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch deserialization-hardening
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 7f90a165d9ab8afeb68cbf04e26688face6eb4d4
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Mon Feb 16 10:59:58 2026 +0100

    fix(security): prevent server crash from malformed command payloads
    
    Any client sending a short or malformed binary payload
    could panic the server via unchecked slice indexing in
    from_bytes deserialization. The panic propagated through
    catch_unwind in spawn_connection, which then sent a stop
    signal to ALL shards — taking down the entire server from
    a single bad packet.
    
    Replace direct slice indexing (bytes[a..b]) with checked
    access (bytes.get(a..b).ok_or(...)?) across all 11 vulnerable
    command deserializers, Identifier, IggyMessageView, and the
    send_messages handler. LoginUser additionally makes version
    and context fields optional for backward compatibility with
    older SDKs (e.g. v0.8.0) that omit them.
---
 .../consumer_groups/create_consumer_group.rs       |  30 ++++-
 .../delete_personal_access_token.rs                |  36 ++++-
 .../login_with_personal_access_token.rs            |  36 ++++-
 core/common/src/commands/streams/create_stream.rs  |  36 ++++-
 core/common/src/commands/topics/create_topic.rs    |  56 ++++++--
 core/common/src/commands/topics/update_topic.rs    |  52 ++++++--
 core/common/src/commands/users/change_password.rs  |  43 ++++--
 core/common/src/commands/users/create_user.rs      |  64 +++++++--
 core/common/src/commands/users/login_user.rs       | 148 ++++++++++++++++-----
 core/common/src/commands/users/update_user.rs      |  41 +++++-
 core/common/src/types/identifier/mod.rs            |  69 ++++++++--
 core/common/src/types/message/message_view.rs      |  24 +++-
 .../handlers/messages/send_messages_handler.rs     |   6 +-
 13 files changed, 532 insertions(+), 109 deletions(-)

diff --git a/core/common/src/commands/consumer_groups/create_consumer_group.rs 
b/core/common/src/commands/consumer_groups/create_consumer_group.rs
index c03d39964..f697bc9c7 100644
--- a/core/common/src/commands/consumer_groups/create_consumer_group.rs
+++ b/core/common/src/commands/consumer_groups/create_consumer_group.rs
@@ -96,10 +96,14 @@ impl BytesSerializable for CreateConsumerGroup {
         position += stream_id.get_size_bytes().as_bytes_usize();
         let topic_id = Identifier::from_bytes(bytes.slice(position..))?;
         position += topic_id.get_size_bytes().as_bytes_usize();
-        let name_length = bytes[position];
-        let name = from_utf8(&bytes[position + 1..position + 1 + name_length 
as usize])
-            .map_err(|_| IggyError::InvalidUtf8)?
-            .to_string();
+        let name_length = 
*bytes.get(position).ok_or(IggyError::InvalidCommand)?;
+        let name = from_utf8(
+            bytes
+                .get(position + 1..position + 1 + name_length as usize)
+                .ok_or(IggyError::InvalidCommand)?,
+        )
+        .map_err(|_| IggyError::InvalidUtf8)?
+        .to_string();
         let command = CreateConsumerGroup {
             stream_id,
             topic_id,
@@ -142,6 +146,24 @@ mod tests {
         assert_eq!(name, command.name);
     }
 
+    #[test]
+    fn from_bytes_should_fail_on_empty_input() {
+        assert!(CreateConsumerGroup::from_bytes(Bytes::new()).is_err());
+    }
+
+    #[test]
+    fn from_bytes_should_fail_on_truncated_input() {
+        let command = CreateConsumerGroup::default();
+        let bytes = command.to_bytes();
+        for i in 0..bytes.len() - 1 {
+            let truncated = bytes.slice(..i);
+            assert!(
+                CreateConsumerGroup::from_bytes(truncated).is_err(),
+                "expected error for truncation at byte {i}"
+            );
+        }
+    }
+
     #[test]
     fn should_be_deserialized_from_bytes() {
         let stream_id = Identifier::numeric(1).unwrap();
diff --git 
a/core/common/src/commands/personal_access_tokens/delete_personal_access_token.rs
 
b/core/common/src/commands/personal_access_tokens/delete_personal_access_token.rs
index 064f78c48..e0e38e32d 100644
--- 
a/core/common/src/commands/personal_access_tokens/delete_personal_access_token.rs
+++ 
b/core/common/src/commands/personal_access_tokens/delete_personal_access_token.rs
@@ -77,9 +77,13 @@ impl BytesSerializable for DeletePersonalAccessToken {
         }
 
         let name_length = bytes[0];
-        let name = from_utf8(&bytes[1..1 + name_length as usize])
-            .map_err(|_| IggyError::InvalidUtf8)?
-            .to_string();
+        let name = from_utf8(
+            bytes
+                .get(1..1 + name_length as usize)
+                .ok_or(IggyError::InvalidCommand)?,
+        )
+        .map_err(|_| IggyError::InvalidUtf8)?
+        .to_string();
         if name.len() != name_length as usize {
             return Err(IggyError::InvalidCommand);
         }
@@ -112,6 +116,32 @@ mod tests {
         assert_eq!(name, command.name);
     }
 
+    #[test]
+    fn from_bytes_should_fail_on_empty_input() {
+        assert!(DeletePersonalAccessToken::from_bytes(Bytes::new()).is_err());
+    }
+
+    #[test]
+    fn from_bytes_should_fail_on_truncated_input() {
+        let command = DeletePersonalAccessToken::default();
+        let bytes = command.to_bytes();
+        for i in 0..bytes.len() - 1 {
+            let truncated = bytes.slice(..i);
+            assert!(
+                DeletePersonalAccessToken::from_bytes(truncated).is_err(),
+                "expected error for truncation at byte {i}"
+            );
+        }
+    }
+
+    #[test]
+    fn from_bytes_should_fail_on_corrupted_name_length() {
+        let mut buf = BytesMut::new();
+        buf.put_u8(255);
+        buf.put_slice(b"short");
+        assert!(DeletePersonalAccessToken::from_bytes(buf.freeze()).is_err());
+    }
+
     #[test]
     fn should_be_deserialized_from_bytes() {
         let name = "test";
diff --git 
a/core/common/src/commands/personal_access_tokens/login_with_personal_access_token.rs
 
b/core/common/src/commands/personal_access_tokens/login_with_personal_access_token.rs
index cdba04e37..52405748f 100644
--- 
a/core/common/src/commands/personal_access_tokens/login_with_personal_access_token.rs
+++ 
b/core/common/src/commands/personal_access_tokens/login_with_personal_access_token.rs
@@ -74,9 +74,13 @@ impl BytesSerializable for LoginWithPersonalAccessToken {
         }
 
         let token_length = bytes[0];
-        let token = from_utf8(&bytes[1..1 + token_length as usize])
-            .map_err(|_| IggyError::InvalidUtf8)?
-            .to_string();
+        let token = from_utf8(
+            bytes
+                .get(1..1 + token_length as usize)
+                .ok_or(IggyError::InvalidCommand)?,
+        )
+        .map_err(|_| IggyError::InvalidUtf8)?
+        .to_string();
         if token.len() != token_length as usize {
             return Err(IggyError::InvalidCommand);
         }
@@ -109,6 +113,32 @@ mod tests {
         assert_eq!(token, command.token);
     }
 
+    #[test]
+    fn from_bytes_should_fail_on_empty_input() {
+        
assert!(LoginWithPersonalAccessToken::from_bytes(Bytes::new()).is_err());
+    }
+
+    #[test]
+    fn from_bytes_should_fail_on_truncated_input() {
+        let command = LoginWithPersonalAccessToken::default();
+        let bytes = command.to_bytes();
+        for i in 0..bytes.len() - 1 {
+            let truncated = bytes.slice(..i);
+            assert!(
+                LoginWithPersonalAccessToken::from_bytes(truncated).is_err(),
+                "expected error for truncation at byte {i}"
+            );
+        }
+    }
+
+    #[test]
+    fn from_bytes_should_fail_on_corrupted_token_length() {
+        let mut buf = BytesMut::new();
+        buf.put_u8(255);
+        buf.put_slice(b"short");
+        
assert!(LoginWithPersonalAccessToken::from_bytes(buf.freeze()).is_err());
+    }
+
     #[test]
     fn should_be_deserialized_from_bytes() {
         let token = "test";
diff --git a/core/common/src/commands/streams/create_stream.rs 
b/core/common/src/commands/streams/create_stream.rs
index 9ff2fec5a..7dd75f034 100644
--- a/core/common/src/commands/streams/create_stream.rs
+++ b/core/common/src/commands/streams/create_stream.rs
@@ -74,9 +74,13 @@ impl BytesSerializable for CreateStream {
         }
 
         let name_length = bytes[0];
-        let name = from_utf8(&bytes[1..1 + name_length as usize])
-            .map_err(|_| IggyError::InvalidUtf8)?
-            .to_string();
+        let name = from_utf8(
+            bytes
+                .get(1..1 + name_length as usize)
+                .ok_or(IggyError::InvalidCommand)?,
+        )
+        .map_err(|_| IggyError::InvalidUtf8)?
+        .to_string();
         if name.len() != name_length as usize {
             return Err(IggyError::InvalidCommand);
         }
@@ -110,6 +114,32 @@ mod tests {
         assert_eq!(name, command.name);
     }
 
+    #[test]
+    fn from_bytes_should_fail_on_empty_input() {
+        assert!(CreateStream::from_bytes(Bytes::new()).is_err());
+    }
+
+    #[test]
+    fn from_bytes_should_fail_on_truncated_input() {
+        let command = CreateStream::default();
+        let bytes = command.to_bytes();
+        for i in 0..bytes.len() - 1 {
+            let truncated = bytes.slice(..i);
+            assert!(
+                CreateStream::from_bytes(truncated).is_err(),
+                "expected error for truncation at byte {i}"
+            );
+        }
+    }
+
+    #[test]
+    fn from_bytes_should_fail_on_corrupted_name_length() {
+        let mut buf = BytesMut::new();
+        buf.put_u8(255);
+        buf.put_slice(b"short");
+        assert!(CreateStream::from_bytes(buf.freeze()).is_err());
+    }
+
     #[test]
     fn should_be_deserialized_from_bytes() {
         let name = "test".to_string();
diff --git a/core/common/src/commands/topics/create_topic.rs 
b/core/common/src/commands/topics/create_topic.rs
index 48cd3d525..47d2acc70 100644
--- a/core/common/src/commands/topics/create_topic.rs
+++ b/core/common/src/commands/topics/create_topic.rs
@@ -127,31 +127,43 @@ impl BytesSerializable for CreateTopic {
         let stream_id = Identifier::from_bytes(bytes.clone())?;
         position += stream_id.get_size_bytes().as_bytes_usize();
         let partitions_count = u32::from_le_bytes(
-            bytes[position..position + 4]
+            bytes
+                .get(position..position + 4)
+                .ok_or(IggyError::InvalidCommand)?
                 .try_into()
                 .map_err(|_| IggyError::InvalidNumberEncoding)?,
         );
-        let compression_algorithm = 
CompressionAlgorithm::from_code(bytes[position + 4])?;
+        let compression_algorithm = CompressionAlgorithm::from_code(
+            *bytes.get(position + 4).ok_or(IggyError::InvalidCommand)?,
+        )?;
         let message_expiry = u64::from_le_bytes(
-            bytes[position + 5..position + 13]
+            bytes
+                .get(position + 5..position + 13)
+                .ok_or(IggyError::InvalidCommand)?
                 .try_into()
                 .map_err(|_| IggyError::InvalidNumberEncoding)?,
         );
         let message_expiry: IggyExpiry = message_expiry.into();
         let max_topic_size = u64::from_le_bytes(
-            bytes[position + 13..position + 21]
+            bytes
+                .get(position + 13..position + 21)
+                .ok_or(IggyError::InvalidCommand)?
                 .try_into()
                 .map_err(|_| IggyError::InvalidNumberEncoding)?,
         );
         let max_topic_size: MaxTopicSize = max_topic_size.into();
-        let replication_factor = match bytes[position + 21] {
+        let replication_factor = match *bytes.get(position + 
21).ok_or(IggyError::InvalidCommand)? {
             0 => None,
             factor => Some(factor),
         };
-        let name_length = bytes[position + 22];
-        let name = from_utf8(&bytes[position + 23..(position + 23 + 
name_length as usize)])
-            .map_err(|_| IggyError::InvalidUtf8)?
-            .to_string();
+        let name_length = *bytes.get(position + 
22).ok_or(IggyError::InvalidCommand)?;
+        let name = from_utf8(
+            bytes
+                .get(position + 23..position + 23 + name_length as usize)
+                .ok_or(IggyError::InvalidCommand)?,
+        )
+        .map_err(|_| IggyError::InvalidUtf8)?
+        .to_string();
         if name.len() != name_length as usize {
             return Err(IggyError::InvalidCommand);
         }
@@ -229,6 +241,32 @@ mod tests {
         assert_eq!(name, command.name);
     }
 
+    #[test]
+    fn from_bytes_should_fail_on_empty_input() {
+        assert!(CreateTopic::from_bytes(Bytes::new()).is_err());
+    }
+
+    #[test]
+    fn from_bytes_should_fail_on_truncated_input() {
+        let command = CreateTopic {
+            stream_id: Identifier::numeric(1).unwrap(),
+            partitions_count: 3,
+            compression_algorithm: CompressionAlgorithm::None,
+            message_expiry: IggyExpiry::NeverExpire,
+            max_topic_size: MaxTopicSize::ServerDefault,
+            replication_factor: Some(1),
+            name: "test".to_string(),
+        };
+        let bytes = command.to_bytes();
+        for i in 0..bytes.len() - 1 {
+            let truncated = bytes.slice(..i);
+            assert!(
+                CreateTopic::from_bytes(truncated).is_err(),
+                "expected error for truncation at byte {i}"
+            );
+        }
+    }
+
     #[test]
     fn should_be_deserialized_from_bytes() {
         let stream_id = Identifier::numeric(1).unwrap();
diff --git a/core/common/src/commands/topics/update_topic.rs 
b/core/common/src/commands/topics/update_topic.rs
index 83dadb69c..35fe67fce 100644
--- a/core/common/src/commands/topics/update_topic.rs
+++ b/core/common/src/commands/topics/update_topic.rs
@@ -128,28 +128,38 @@ impl BytesSerializable for UpdateTopic {
         position += stream_id.get_size_bytes().as_bytes_usize();
         let topic_id = Identifier::from_bytes(bytes.slice(position..))?;
         position += topic_id.get_size_bytes().as_bytes_usize();
-        let compression_algorithm = 
CompressionAlgorithm::from_code(bytes[position])?;
+        let compression_algorithm = CompressionAlgorithm::from_code(
+            *bytes.get(position).ok_or(IggyError::InvalidCommand)?,
+        )?;
         position += 1;
         let message_expiry = u64::from_le_bytes(
-            bytes[position..position + 8]
+            bytes
+                .get(position..position + 8)
+                .ok_or(IggyError::InvalidCommand)?
                 .try_into()
                 .map_err(|_| IggyError::InvalidNumberEncoding)?,
         );
         let message_expiry: IggyExpiry = message_expiry.into();
         let max_topic_size = u64::from_le_bytes(
-            bytes[position + 8..position + 16]
+            bytes
+                .get(position + 8..position + 16)
+                .ok_or(IggyError::InvalidCommand)?
                 .try_into()
                 .map_err(|_| IggyError::InvalidNumberEncoding)?,
         );
         let max_topic_size: MaxTopicSize = max_topic_size.into();
-        let replication_factor = match bytes[position + 16] {
+        let replication_factor = match *bytes.get(position + 
16).ok_or(IggyError::InvalidCommand)? {
             0 => None,
             factor => Some(factor),
         };
-        let name_length = bytes[position + 17];
-        let name = from_utf8(&bytes[position + 18..(position + 18 + 
name_length as usize)])
-            .map_err(|_| IggyError::InvalidUtf8)?
-            .to_string();
+        let name_length = *bytes.get(position + 
17).ok_or(IggyError::InvalidCommand)?;
+        let name = from_utf8(
+            bytes
+                .get(position + 18..position + 18 + name_length as usize)
+                .ok_or(IggyError::InvalidCommand)?,
+        )
+        .map_err(|_| IggyError::InvalidUtf8)?
+        .to_string();
         if name.len() != name_length as usize {
             return Err(IggyError::InvalidCommand);
         }
@@ -229,6 +239,32 @@ mod tests {
         assert_eq!(name, command.name);
     }
 
+    #[test]
+    fn from_bytes_should_fail_on_empty_input() {
+        assert!(UpdateTopic::from_bytes(Bytes::new()).is_err());
+    }
+
+    #[test]
+    fn from_bytes_should_fail_on_truncated_input() {
+        let command = UpdateTopic {
+            stream_id: Identifier::numeric(1).unwrap(),
+            topic_id: Identifier::numeric(2).unwrap(),
+            compression_algorithm: CompressionAlgorithm::None,
+            message_expiry: IggyExpiry::NeverExpire,
+            max_topic_size: MaxTopicSize::ServerDefault,
+            replication_factor: Some(1),
+            name: "test".to_string(),
+        };
+        let bytes = command.to_bytes();
+        for i in 0..bytes.len() - 1 {
+            let truncated = bytes.slice(..i);
+            assert!(
+                UpdateTopic::from_bytes(truncated).is_err(),
+                "expected error for truncation at byte {i}"
+            );
+        }
+    }
+
     #[test]
     fn should_be_deserialized_from_bytes() {
         let stream_id = Identifier::numeric(1).unwrap();
diff --git a/core/common/src/commands/users/change_password.rs 
b/core/common/src/commands/users/change_password.rs
index 4a5663778..ba8e251db 100644
--- a/core/common/src/commands/users/change_password.rs
+++ b/core/common/src/commands/users/change_password.rs
@@ -101,18 +101,25 @@ impl BytesSerializable for ChangePassword {
 
         let user_id = Identifier::from_bytes(bytes.clone())?;
         let mut position = user_id.get_size_bytes().as_bytes_usize();
-        let current_password_length = bytes[position];
+        let current_password_length = 
*bytes.get(position).ok_or(IggyError::InvalidCommand)?;
         position += 1;
-        let current_password =
-            from_utf8(&bytes[position..position + current_password_length as 
usize])
-                .map_err(|_| IggyError::InvalidUtf8)?
-                .to_string();
+        let current_password = from_utf8(
+            bytes
+                .get(position..position + current_password_length as usize)
+                .ok_or(IggyError::InvalidCommand)?,
+        )
+        .map_err(|_| IggyError::InvalidUtf8)?
+        .to_string();
         position += current_password_length as usize;
-        let new_password_length = bytes[position];
+        let new_password_length = 
*bytes.get(position).ok_or(IggyError::InvalidCommand)?;
         position += 1;
-        let new_password = from_utf8(&bytes[position..position + 
new_password_length as usize])
-            .map_err(|_| IggyError::InvalidUtf8)?
-            .to_string();
+        let new_password = from_utf8(
+            bytes
+                .get(position..position + new_password_length as usize)
+                .ok_or(IggyError::InvalidCommand)?,
+        )
+        .map_err(|_| IggyError::InvalidUtf8)?
+        .to_string();
 
         let command = ChangePassword {
             user_id,
@@ -160,6 +167,24 @@ mod tests {
         assert_eq!(new_password, command.new_password);
     }
 
+    #[test]
+    fn from_bytes_should_fail_on_empty_input() {
+        assert!(ChangePassword::from_bytes(Bytes::new()).is_err());
+    }
+
+    #[test]
+    fn from_bytes_should_fail_on_truncated_input() {
+        let command = ChangePassword::default();
+        let bytes = command.to_bytes();
+        for i in 0..bytes.len() - 1 {
+            let truncated = bytes.slice(..i);
+            assert!(
+                ChangePassword::from_bytes(truncated).is_err(),
+                "expected error for truncation at byte {i}"
+            );
+        }
+    }
+
     #[test]
     fn should_be_deserialized_from_bytes() {
         let user_id = Identifier::numeric(1).unwrap();
diff --git a/core/common/src/commands/users/create_user.rs 
b/core/common/src/commands/users/create_user.rs
index 7d23c54ee..b273a60a8 100644
--- a/core/common/src/commands/users/create_user.rs
+++ b/core/common/src/commands/users/create_user.rs
@@ -111,27 +111,35 @@ impl BytesSerializable for CreateUser {
         }
 
         let username_length = bytes[0];
-        let username = from_utf8(&bytes[1..1 + username_length as usize])
-            .map_err(|_| IggyError::InvalidUtf8)?
-            .to_string();
+        let username = from_utf8(
+            bytes
+                .get(1..1 + username_length as usize)
+                .ok_or(IggyError::InvalidCommand)?,
+        )
+        .map_err(|_| IggyError::InvalidUtf8)?
+        .to_string();
         if username.len() != username_length as usize {
             return Err(IggyError::InvalidCommand);
         }
 
         let mut position = 1 + username_length as usize;
-        let password_length = bytes[position];
+        let password_length = 
*bytes.get(position).ok_or(IggyError::InvalidCommand)?;
         position += 1;
-        let password = from_utf8(&bytes[position..position + password_length 
as usize])
-            .map_err(|_| IggyError::InvalidUtf8)?
-            .to_string();
+        let password = from_utf8(
+            bytes
+                .get(position..position + password_length as usize)
+                .ok_or(IggyError::InvalidCommand)?,
+        )
+        .map_err(|_| IggyError::InvalidUtf8)?
+        .to_string();
         if password.len() != password_length as usize {
             return Err(IggyError::InvalidCommand);
         }
 
         position += password_length as usize;
-        let status = UserStatus::from_code(bytes[position])?;
+        let status = 
UserStatus::from_code(*bytes.get(position).ok_or(IggyError::InvalidCommand)?)?;
         position += 1;
-        let has_permissions = bytes[position];
+        let has_permissions = 
*bytes.get(position).ok_or(IggyError::InvalidCommand)?;
         if has_permissions > 1 {
             return Err(IggyError::InvalidCommand);
         }
@@ -139,14 +147,18 @@ impl BytesSerializable for CreateUser {
         position += 1;
         let permissions = if has_permissions == 1 {
             let permissions_length = u32::from_le_bytes(
-                bytes[position..position + 4]
+                bytes
+                    .get(position..position + 4)
+                    .ok_or(IggyError::InvalidCommand)?
                     .try_into()
                     .map_err(|_| IggyError::InvalidNumberEncoding)?,
             );
             position += 4;
-            Some(Permissions::from_bytes(
-                bytes.slice(position..position + permissions_length as usize),
-            )?)
+            let end = position + permissions_length as usize;
+            if end > bytes.len() {
+                return Err(IggyError::InvalidCommand);
+            }
+            Some(Permissions::from_bytes(bytes.slice(position..end))?)
         } else {
             None
         };
@@ -232,6 +244,32 @@ mod tests {
         assert_eq!(permissions, command.permissions.unwrap());
     }
 
+    #[test]
+    fn from_bytes_should_fail_on_empty_input() {
+        assert!(CreateUser::from_bytes(Bytes::new()).is_err());
+    }
+
+    #[test]
+    fn from_bytes_should_fail_on_truncated_input() {
+        let command = CreateUser::default();
+        let bytes = command.to_bytes();
+        for i in 0..bytes.len() - 1 {
+            let truncated = bytes.slice(..i);
+            assert!(
+                CreateUser::from_bytes(truncated).is_err(),
+                "expected error for truncation at byte {i}"
+            );
+        }
+    }
+
+    #[test]
+    fn from_bytes_should_fail_on_corrupted_username_length() {
+        let mut buf = BytesMut::new();
+        buf.put_u8(255);
+        buf.put_slice(b"short");
+        assert!(CreateUser::from_bytes(buf.freeze()).is_err());
+    }
+
     #[test]
     fn should_be_deserialized_from_bytes() {
         let username = "user";
diff --git a/core/common/src/commands/users/login_user.rs 
b/core/common/src/commands/users/login_user.rs
index edd1e8426..e0005e8b8 100644
--- a/core/common/src/commands/users/login_user.rs
+++ b/core/common/src/commands/users/login_user.rs
@@ -115,17 +115,23 @@ impl BytesSerializable for LoginUser {
         }
 
         let username_length = bytes[0];
-        let username = from_utf8(&bytes[1..=(username_length as usize)])
-            .map_err(|_| IggyError::InvalidUtf8)?
-            .to_string();
+        let username = from_utf8(
+            bytes
+                .get(1..1 + username_length as usize)
+                .ok_or(IggyError::InvalidCommand)?,
+        )
+        .map_err(|_| IggyError::InvalidUtf8)?
+        .to_string();
         if username.len() != username_length as usize {
             return Err(IggyError::InvalidCommand);
         }
 
-        let password_length = bytes[1 + username_length as usize];
+        let pos = 1 + username_length as usize;
+        let password_length = 
*bytes.get(pos).ok_or(IggyError::InvalidCommand)?;
         let password = from_utf8(
-            &bytes[2 + username_length as usize
-                ..2 + username_length as usize + password_length as usize],
+            bytes
+                .get(pos + 1..pos + 1 + password_length as usize)
+                .ok_or(IggyError::InvalidCommand)?,
         )
         .map_err(|_| IggyError::InvalidUtf8)?
         .to_string();
@@ -133,37 +139,57 @@ impl BytesSerializable for LoginUser {
             return Err(IggyError::InvalidCommand);
         }
 
-        let position = 2 + username_length as usize + password_length as usize;
-        let version_length = u32::from_le_bytes(
-            bytes[position..position + 4]
-                .try_into()
-                .map_err(|_| IggyError::InvalidNumberEncoding)?,
-        );
-        let version = match version_length {
-            0 => None,
-            _ => {
-                let version =
-                    from_utf8(&bytes[position + 4..position + 4 + 
version_length as usize])
-                        .map_err(|_| IggyError::InvalidUtf8)?
-                        .to_string();
-                Some(version)
+        let mut position = pos + 1 + password_length as usize;
+
+        // Version and context fields are optional for backward compatibility
+        // with older SDKs (e.g. v0.8.0) that don't send them.
+        let version = if let Some(len_bytes) = bytes.get(position..position + 
4) {
+            let version_length = u32::from_le_bytes(
+                len_bytes
+                    .try_into()
+                    .map_err(|_| IggyError::InvalidNumberEncoding)?,
+            );
+            position += 4;
+            match version_length {
+                0 => None,
+                _ => {
+                    let version = from_utf8(
+                        bytes
+                            .get(position..position + version_length as usize)
+                            .ok_or(IggyError::InvalidCommand)?,
+                    )
+                    .map_err(|_| IggyError::InvalidUtf8)?
+                    .to_string();
+                    position += version_length as usize;
+                    Some(version)
+                }
             }
+        } else {
+            None
         };
-        let position = position + 4 + version_length as usize;
-        let context_length = u32::from_le_bytes(
-            bytes[position..position + 4]
-                .try_into()
-                .map_err(|_| IggyError::InvalidNumberEncoding)?,
-        );
-        let context = match context_length {
-            0 => None,
-            _ => {
-                let context =
-                    from_utf8(&bytes[position + 4..position + 4 + 
context_length as usize])
-                        .map_err(|_| IggyError::InvalidUtf8)?
-                        .to_string();
-                Some(context)
+
+        let context = if let Some(len_bytes) = bytes.get(position..position + 
4) {
+            let context_length = u32::from_le_bytes(
+                len_bytes
+                    .try_into()
+                    .map_err(|_| IggyError::InvalidNumberEncoding)?,
+            );
+            position += 4;
+            match context_length {
+                0 => None,
+                _ => {
+                    let context = from_utf8(
+                        bytes
+                            .get(position..position + context_length as usize)
+                            .ok_or(IggyError::InvalidCommand)?,
+                    )
+                    .map_err(|_| IggyError::InvalidUtf8)?
+                    .to_string();
+                    Some(context)
+                }
             }
+        } else {
+            None
         };
 
         let command = LoginUser {
@@ -226,6 +252,60 @@ mod tests {
         assert_eq!(context, command.context);
     }
 
+    #[test]
+    fn from_bytes_should_fail_on_empty_input() {
+        assert!(LoginUser::from_bytes(Bytes::new()).is_err());
+    }
+
+    #[test]
+    fn from_bytes_should_fail_on_truncated_input() {
+        let command = LoginUser {
+            username: "user".to_string(),
+            password: "secret".to_string(),
+            version: Some("1.0.0".to_string()),
+            context: Some("test".to_string()),
+        };
+        let bytes = command.to_bytes();
+        // Truncate at every position up to (but not including) the version 
field.
+        // Positions within username/password must error; positions at or past 
the
+        // version boundary are valid old-SDK payloads.
+        let version_offset = 2 + command.username.len() + 
command.password.len();
+        for i in 0..version_offset {
+            let truncated = bytes.slice(..i);
+            assert!(
+                LoginUser::from_bytes(truncated).is_err(),
+                "expected error for truncation at byte {i}"
+            );
+        }
+    }
+
+    #[test]
+    fn from_bytes_should_fail_on_corrupted_username_length() {
+        let mut buf = BytesMut::new();
+        buf.put_u8(255); // username_length = 255
+        buf.put_slice(b"short");
+        assert!(LoginUser::from_bytes(buf.freeze()).is_err());
+    }
+
+    #[test]
+    fn from_bytes_should_accept_old_sdk_format_without_version_context() {
+        let username = "user";
+        let password = "secret";
+        let mut bytes = BytesMut::new();
+        #[allow(clippy::cast_possible_truncation)]
+        bytes.put_u8(username.len() as u8);
+        bytes.put_slice(username.as_bytes());
+        #[allow(clippy::cast_possible_truncation)]
+        bytes.put_u8(password.len() as u8);
+        bytes.put_slice(password.as_bytes());
+
+        let command = LoginUser::from_bytes(bytes.freeze()).unwrap();
+        assert_eq!(command.username, username);
+        assert_eq!(command.password, password);
+        assert_eq!(command.version, None);
+        assert_eq!(command.context, None);
+    }
+
     #[test]
     fn should_be_deserialized_from_bytes() {
         let username = "user";
diff --git a/core/common/src/commands/users/update_user.rs 
b/core/common/src/commands/users/update_user.rs
index 3e2811915..b6df319a7 100644
--- a/core/common/src/commands/users/update_user.rs
+++ b/core/common/src/commands/users/update_user.rs
@@ -96,32 +96,37 @@ impl BytesSerializable for UpdateUser {
 
         let user_id = Identifier::from_bytes(bytes.clone())?;
         let mut position = user_id.get_size_bytes().as_bytes_usize();
-        let has_username = bytes[position];
+        let has_username = 
*bytes.get(position).ok_or(IggyError::InvalidCommand)?;
         if has_username > 1 {
             return Err(IggyError::InvalidCommand);
         }
 
         position += 1;
         let username = if has_username == 1 {
-            let username_length = bytes[position];
+            let username_length = 
*bytes.get(position).ok_or(IggyError::InvalidCommand)?;
             position += 1;
-            let username = from_utf8(&bytes[position..position + 
username_length as usize])
-                .map_err(|_| IggyError::InvalidUtf8)?
-                .to_string();
+            let username = from_utf8(
+                bytes
+                    .get(position..position + username_length as usize)
+                    .ok_or(IggyError::InvalidCommand)?,
+            )
+            .map_err(|_| IggyError::InvalidUtf8)?
+            .to_string();
             position += username_length as usize;
             Some(username)
         } else {
             None
         };
 
-        let has_status = bytes[position];
+        let has_status = 
*bytes.get(position).ok_or(IggyError::InvalidCommand)?;
         if has_status > 1 {
             return Err(IggyError::InvalidCommand);
         }
 
         let status = if has_status == 1 {
             position += 1;
-            let status = UserStatus::from_code(bytes[position])?;
+            let status =
+                
UserStatus::from_code(*bytes.get(position).ok_or(IggyError::InvalidCommand)?)?;
             Some(status)
         } else {
             None
@@ -180,6 +185,28 @@ mod tests {
         assert_eq!(status, command.status.unwrap());
     }
 
+    #[test]
+    fn from_bytes_should_fail_on_empty_input() {
+        assert!(UpdateUser::from_bytes(Bytes::new()).is_err());
+    }
+
+    #[test]
+    fn from_bytes_should_fail_on_truncated_input() {
+        let command = UpdateUser {
+            user_id: Identifier::numeric(1).unwrap(),
+            username: Some("user".to_string()),
+            status: Some(UserStatus::Active),
+        };
+        let bytes = command.to_bytes();
+        for i in 0..bytes.len() - 1 {
+            let truncated = bytes.slice(..i);
+            assert!(
+                UpdateUser::from_bytes(truncated).is_err(),
+                "expected error for truncation at byte {i}"
+            );
+        }
+    }
+
     #[test]
     fn should_be_deserialized_from_bytes() {
         let user_id = Identifier::numeric(1).unwrap();
diff --git a/core/common/src/types/identifier/mod.rs 
b/core/common/src/types/identifier/mod.rs
index bd4a5ccc0..9f17b357d 100644
--- a/core/common/src/types/identifier/mod.rs
+++ b/core/common/src/types/identifier/mod.rs
@@ -187,12 +187,12 @@ impl Identifier {
 
     /// Creates identifier from raw bytes
     pub fn from_raw_bytes(bytes: &[u8]) -> Result<Self, IggyError> {
-        let kind = IdKind::from_code(bytes[0])?;
-        let length = bytes[1];
-        let value = bytes[2..2 + length as usize].to_vec();
-        if value.len() != length as usize {
-            return Err(IggyError::InvalidIdentifier);
-        }
+        let kind = 
IdKind::from_code(*bytes.first().ok_or(IggyError::InvalidIdentifier)?)?;
+        let length = *bytes.get(1).ok_or(IggyError::InvalidIdentifier)?;
+        let value = bytes
+            .get(2..2 + length as usize)
+            .ok_or(IggyError::InvalidIdentifier)?
+            .to_vec();
 
         let identifier = Identifier {
             kind,
@@ -234,10 +234,10 @@ impl BytesSerializable for Identifier {
 
         let kind = IdKind::from_code(bytes[0])?;
         let length = bytes[1];
-        let value = bytes[2..2 + length as usize].to_vec();
-        if value.len() != length as usize {
-            return Err(IggyError::InvalidIdentifier);
-        }
+        let value = bytes
+            .get(2..2 + length as usize)
+            .ok_or(IggyError::InvalidIdentifier)?
+            .to_vec();
 
         let identifier = Identifier {
             kind,
@@ -382,6 +382,55 @@ mod tests {
         assert!(Identifier::named(&"a".repeat(256)).is_err());
     }
 
+    #[test]
+    fn from_bytes_should_fail_on_empty_input() {
+        assert!(Identifier::from_bytes(Bytes::new()).is_err());
+    }
+
+    #[test]
+    fn from_bytes_should_fail_on_truncated_input() {
+        let id = Identifier::numeric(42).unwrap();
+        let bytes = id.to_bytes();
+        for i in 0..bytes.len() - 1 {
+            let truncated = bytes.slice(..i);
+            assert!(
+                Identifier::from_bytes(truncated).is_err(),
+                "expected error for truncation at byte {i}"
+            );
+        }
+    }
+
+    #[test]
+    fn from_bytes_should_fail_on_corrupted_length() {
+        let mut buf = BytesMut::new();
+        buf.put_u8(1); // Numeric kind
+        buf.put_u8(255); // length = 255 but only 2 bytes of value follow
+        buf.put_u16_le(0);
+        assert!(Identifier::from_bytes(buf.freeze()).is_err());
+    }
+
+    #[test]
+    fn from_raw_bytes_should_fail_on_empty_input() {
+        assert!(Identifier::from_raw_bytes(&[]).is_err());
+    }
+
+    #[test]
+    fn from_raw_bytes_should_fail_on_truncated_input() {
+        let id = Identifier::numeric(42).unwrap();
+        let bytes = id.to_bytes();
+        for i in 0..bytes.len() - 1 {
+            assert!(
+                Identifier::from_raw_bytes(&bytes[..i]).is_err(),
+                "expected error for truncation at byte {i}"
+            );
+        }
+    }
+
+    #[test]
+    fn from_raw_bytes_should_fail_on_corrupted_length() {
+        assert!(Identifier::from_raw_bytes(&[1, 255, 0, 0]).is_err());
+    }
+
     #[test]
     fn numeric_id_should_be_converted_into_identifier_using_trait() {
         let id = 1;
diff --git a/core/common/src/types/message/message_view.rs 
b/core/common/src/types/message/message_view.rs
index 1f1442ea7..c9a7bb76f 100644
--- a/core/common/src/types/message/message_view.rs
+++ b/core/common/src/types/message/message_view.rs
@@ -108,7 +108,10 @@ impl<'a> IggyMessageView<'a> {
     pub fn payload(&self) -> &[u8] {
         let header_view = self.header();
         let payload_len = header_view.payload_length();
-        &self.buffer[self.payload_offset..self.payload_offset + payload_len]
+        let end = self.payload_offset + payload_len;
+        self.buffer
+            .get(self.payload_offset..end)
+            .unwrap_or_default()
     }
 
     /// Validates that the message view is properly formatted and has valid 
data.
@@ -131,9 +134,13 @@ impl<'a> IggyMessageView<'a> {
     /// Validates that the message view has a valid checksum.
     /// This should be called only on server side.
     pub fn calculate_checksum(&self) -> u64 {
-        let checksum_field_size = size_of::<u64>(); // Skip checksum field for 
checksum calculation
+        let checksum_field_size = size_of::<u64>();
         let size = self.size() - checksum_field_size;
-        let data = &self.buffer[checksum_field_size..checksum_field_size + 
size];
+        let end = checksum_field_size + size;
+        let data = self
+            .buffer
+            .get(checksum_field_size..end)
+            .unwrap_or_default();
         checksum::calculate_checksum(data)
     }
 }
@@ -186,8 +193,17 @@ impl<'a> Iterator for IggyMessageViewIterator<'a> {
         }
 
         let remaining = &self.buffer[self.position..];
+        if remaining.len() < IGGY_MESSAGE_HEADER_SIZE {
+            return None;
+        }
+
         let view = IggyMessageView::new(remaining);
-        self.position += view.size();
+        let size = view.size();
+        if size > remaining.len() {
+            return None;
+        }
+
+        self.position += size;
         Some(view)
     }
 }
diff --git a/core/server/src/binary/handlers/messages/send_messages_handler.rs 
b/core/server/src/binary/handlers/messages/send_messages_handler.rs
index 713d64ab2..477265b27 100644
--- a/core/server/src/binary/handlers/messages/send_messages_handler.rs
+++ b/core/server/src/binary/handlers/messages/send_messages_handler.rs
@@ -85,9 +85,11 @@ impl ServerCommandHandler for SendMessages {
         self.partitioning = partitioning;
 
         let messages_count = u32::from_le_bytes(
-            metadata_buf[element_size..element_size + 4]
+            metadata_buf
+                .get(element_size..element_size + 4)
+                .ok_or(IggyError::InvalidCommand)?
                 .try_into()
-                .unwrap(),
+                .map_err(|_| IggyError::InvalidNumberEncoding)?,
         );
         let indexes_size = messages_count as usize * INDEX_SIZE;
 


Reply via email to