This is an automated email from the ASF dual-hosted git repository. hgruszecki pushed a commit to branch deserialization-hardening in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 0ffc8de4e2b4bc281c41574aad5860eb59fece53 Author: Hubert Gruszecki <[email protected]> AuthorDate: Mon Feb 16 10:59:58 2026 +0100 fix(security): prevent server crash from malformed command payloads Any client sending a short or malformed binary payload could panic the server via unchecked slice indexing in from_bytes deserialization. The panic propagated through catch_unwind in spawn_connection, which then sent a stop signal to ALL shards — taking down the entire server from a single bad packet. Replace direct slice indexing (bytes[a..b]) with checked access (bytes.get(a..b).ok_or(...)?) across all 11 vulnerable command deserializers, Identifier, IggyMessageView, and the send_messages handler. LoginUser additionally makes version and context fields optional for backward compatibility with older SDKs (e.g. v0.8.0) that omit them. --- .../consumer_groups/create_consumer_group.rs | 30 ++++- .../delete_personal_access_token.rs | 36 ++++- .../login_with_personal_access_token.rs | 36 ++++- core/common/src/commands/streams/create_stream.rs | 36 ++++- core/common/src/commands/topics/create_topic.rs | 56 ++++++-- core/common/src/commands/topics/update_topic.rs | 52 ++++++-- core/common/src/commands/users/change_password.rs | 43 ++++-- core/common/src/commands/users/create_user.rs | 64 +++++++-- core/common/src/commands/users/login_user.rs | 148 ++++++++++++++++----- core/common/src/commands/users/update_user.rs | 41 +++++- core/common/src/types/identifier/mod.rs | 69 ++++++++-- core/common/src/types/message/message_view.rs | 24 +++- .../handlers/messages/send_messages_handler.rs | 6 +- 13 files changed, 532 insertions(+), 109 deletions(-) diff --git a/core/common/src/commands/consumer_groups/create_consumer_group.rs b/core/common/src/commands/consumer_groups/create_consumer_group.rs index c03d39964..f697bc9c7 100644 --- a/core/common/src/commands/consumer_groups/create_consumer_group.rs +++ b/core/common/src/commands/consumer_groups/create_consumer_group.rs @@ -96,10 +96,14 @@ impl BytesSerializable for CreateConsumerGroup { position += stream_id.get_size_bytes().as_bytes_usize(); let topic_id = Identifier::from_bytes(bytes.slice(position..))?; position += topic_id.get_size_bytes().as_bytes_usize(); - let name_length = bytes[position]; - let name = from_utf8(&bytes[position + 1..position + 1 + name_length as usize]) - .map_err(|_| IggyError::InvalidUtf8)? - .to_string(); + let name_length = *bytes.get(position).ok_or(IggyError::InvalidCommand)?; + let name = from_utf8( + bytes + .get(position + 1..position + 1 + name_length as usize) + .ok_or(IggyError::InvalidCommand)?, + ) + .map_err(|_| IggyError::InvalidUtf8)? + .to_string(); let command = CreateConsumerGroup { stream_id, topic_id, @@ -142,6 +146,24 @@ mod tests { assert_eq!(name, command.name); } + #[test] + fn from_bytes_should_fail_on_empty_input() { + assert!(CreateConsumerGroup::from_bytes(Bytes::new()).is_err()); + } + + #[test] + fn from_bytes_should_fail_on_truncated_input() { + let command = CreateConsumerGroup::default(); + let bytes = command.to_bytes(); + for i in 0..bytes.len() - 1 { + let truncated = bytes.slice(..i); + assert!( + CreateConsumerGroup::from_bytes(truncated).is_err(), + "expected error for truncation at byte {i}" + ); + } + } + #[test] fn should_be_deserialized_from_bytes() { let stream_id = Identifier::numeric(1).unwrap(); diff --git a/core/common/src/commands/personal_access_tokens/delete_personal_access_token.rs b/core/common/src/commands/personal_access_tokens/delete_personal_access_token.rs index 064f78c48..e0e38e32d 100644 --- a/core/common/src/commands/personal_access_tokens/delete_personal_access_token.rs +++ b/core/common/src/commands/personal_access_tokens/delete_personal_access_token.rs @@ -77,9 +77,13 @@ impl BytesSerializable for DeletePersonalAccessToken { } let name_length = bytes[0]; - let name = from_utf8(&bytes[1..1 + name_length as usize]) - .map_err(|_| IggyError::InvalidUtf8)? - .to_string(); + let name = from_utf8( + bytes + .get(1..1 + name_length as usize) + .ok_or(IggyError::InvalidCommand)?, + ) + .map_err(|_| IggyError::InvalidUtf8)? + .to_string(); if name.len() != name_length as usize { return Err(IggyError::InvalidCommand); } @@ -112,6 +116,32 @@ mod tests { assert_eq!(name, command.name); } + #[test] + fn from_bytes_should_fail_on_empty_input() { + assert!(DeletePersonalAccessToken::from_bytes(Bytes::new()).is_err()); + } + + #[test] + fn from_bytes_should_fail_on_truncated_input() { + let command = DeletePersonalAccessToken::default(); + let bytes = command.to_bytes(); + for i in 0..bytes.len() - 1 { + let truncated = bytes.slice(..i); + assert!( + DeletePersonalAccessToken::from_bytes(truncated).is_err(), + "expected error for truncation at byte {i}" + ); + } + } + + #[test] + fn from_bytes_should_fail_on_corrupted_name_length() { + let mut buf = BytesMut::new(); + buf.put_u8(255); + buf.put_slice(b"short"); + assert!(DeletePersonalAccessToken::from_bytes(buf.freeze()).is_err()); + } + #[test] fn should_be_deserialized_from_bytes() { let name = "test"; diff --git a/core/common/src/commands/personal_access_tokens/login_with_personal_access_token.rs b/core/common/src/commands/personal_access_tokens/login_with_personal_access_token.rs index cdba04e37..52405748f 100644 --- a/core/common/src/commands/personal_access_tokens/login_with_personal_access_token.rs +++ b/core/common/src/commands/personal_access_tokens/login_with_personal_access_token.rs @@ -74,9 +74,13 @@ impl BytesSerializable for LoginWithPersonalAccessToken { } let token_length = bytes[0]; - let token = from_utf8(&bytes[1..1 + token_length as usize]) - .map_err(|_| IggyError::InvalidUtf8)? - .to_string(); + let token = from_utf8( + bytes + .get(1..1 + token_length as usize) + .ok_or(IggyError::InvalidCommand)?, + ) + .map_err(|_| IggyError::InvalidUtf8)? + .to_string(); if token.len() != token_length as usize { return Err(IggyError::InvalidCommand); } @@ -109,6 +113,32 @@ mod tests { assert_eq!(token, command.token); } + #[test] + fn from_bytes_should_fail_on_empty_input() { + assert!(LoginWithPersonalAccessToken::from_bytes(Bytes::new()).is_err()); + } + + #[test] + fn from_bytes_should_fail_on_truncated_input() { + let command = LoginWithPersonalAccessToken::default(); + let bytes = command.to_bytes(); + for i in 0..bytes.len() - 1 { + let truncated = bytes.slice(..i); + assert!( + LoginWithPersonalAccessToken::from_bytes(truncated).is_err(), + "expected error for truncation at byte {i}" + ); + } + } + + #[test] + fn from_bytes_should_fail_on_corrupted_token_length() { + let mut buf = BytesMut::new(); + buf.put_u8(255); + buf.put_slice(b"short"); + assert!(LoginWithPersonalAccessToken::from_bytes(buf.freeze()).is_err()); + } + #[test] fn should_be_deserialized_from_bytes() { let token = "test"; diff --git a/core/common/src/commands/streams/create_stream.rs b/core/common/src/commands/streams/create_stream.rs index 9ff2fec5a..7dd75f034 100644 --- a/core/common/src/commands/streams/create_stream.rs +++ b/core/common/src/commands/streams/create_stream.rs @@ -74,9 +74,13 @@ impl BytesSerializable for CreateStream { } let name_length = bytes[0]; - let name = from_utf8(&bytes[1..1 + name_length as usize]) - .map_err(|_| IggyError::InvalidUtf8)? - .to_string(); + let name = from_utf8( + bytes + .get(1..1 + name_length as usize) + .ok_or(IggyError::InvalidCommand)?, + ) + .map_err(|_| IggyError::InvalidUtf8)? + .to_string(); if name.len() != name_length as usize { return Err(IggyError::InvalidCommand); } @@ -110,6 +114,32 @@ mod tests { assert_eq!(name, command.name); } + #[test] + fn from_bytes_should_fail_on_empty_input() { + assert!(CreateStream::from_bytes(Bytes::new()).is_err()); + } + + #[test] + fn from_bytes_should_fail_on_truncated_input() { + let command = CreateStream::default(); + let bytes = command.to_bytes(); + for i in 0..bytes.len() - 1 { + let truncated = bytes.slice(..i); + assert!( + CreateStream::from_bytes(truncated).is_err(), + "expected error for truncation at byte {i}" + ); + } + } + + #[test] + fn from_bytes_should_fail_on_corrupted_name_length() { + let mut buf = BytesMut::new(); + buf.put_u8(255); + buf.put_slice(b"short"); + assert!(CreateStream::from_bytes(buf.freeze()).is_err()); + } + #[test] fn should_be_deserialized_from_bytes() { let name = "test".to_string(); diff --git a/core/common/src/commands/topics/create_topic.rs b/core/common/src/commands/topics/create_topic.rs index 48cd3d525..47d2acc70 100644 --- a/core/common/src/commands/topics/create_topic.rs +++ b/core/common/src/commands/topics/create_topic.rs @@ -127,31 +127,43 @@ impl BytesSerializable for CreateTopic { let stream_id = Identifier::from_bytes(bytes.clone())?; position += stream_id.get_size_bytes().as_bytes_usize(); let partitions_count = u32::from_le_bytes( - bytes[position..position + 4] + bytes + .get(position..position + 4) + .ok_or(IggyError::InvalidCommand)? .try_into() .map_err(|_| IggyError::InvalidNumberEncoding)?, ); - let compression_algorithm = CompressionAlgorithm::from_code(bytes[position + 4])?; + let compression_algorithm = CompressionAlgorithm::from_code( + *bytes.get(position + 4).ok_or(IggyError::InvalidCommand)?, + )?; let message_expiry = u64::from_le_bytes( - bytes[position + 5..position + 13] + bytes + .get(position + 5..position + 13) + .ok_or(IggyError::InvalidCommand)? .try_into() .map_err(|_| IggyError::InvalidNumberEncoding)?, ); let message_expiry: IggyExpiry = message_expiry.into(); let max_topic_size = u64::from_le_bytes( - bytes[position + 13..position + 21] + bytes + .get(position + 13..position + 21) + .ok_or(IggyError::InvalidCommand)? .try_into() .map_err(|_| IggyError::InvalidNumberEncoding)?, ); let max_topic_size: MaxTopicSize = max_topic_size.into(); - let replication_factor = match bytes[position + 21] { + let replication_factor = match *bytes.get(position + 21).ok_or(IggyError::InvalidCommand)? { 0 => None, factor => Some(factor), }; - let name_length = bytes[position + 22]; - let name = from_utf8(&bytes[position + 23..(position + 23 + name_length as usize)]) - .map_err(|_| IggyError::InvalidUtf8)? - .to_string(); + let name_length = *bytes.get(position + 22).ok_or(IggyError::InvalidCommand)?; + let name = from_utf8( + bytes + .get(position + 23..position + 23 + name_length as usize) + .ok_or(IggyError::InvalidCommand)?, + ) + .map_err(|_| IggyError::InvalidUtf8)? + .to_string(); if name.len() != name_length as usize { return Err(IggyError::InvalidCommand); } @@ -229,6 +241,32 @@ mod tests { assert_eq!(name, command.name); } + #[test] + fn from_bytes_should_fail_on_empty_input() { + assert!(CreateTopic::from_bytes(Bytes::new()).is_err()); + } + + #[test] + fn from_bytes_should_fail_on_truncated_input() { + let command = CreateTopic { + stream_id: Identifier::numeric(1).unwrap(), + partitions_count: 3, + compression_algorithm: CompressionAlgorithm::None, + message_expiry: IggyExpiry::NeverExpire, + max_topic_size: MaxTopicSize::ServerDefault, + replication_factor: Some(1), + name: "test".to_string(), + }; + let bytes = command.to_bytes(); + for i in 0..bytes.len() - 1 { + let truncated = bytes.slice(..i); + assert!( + CreateTopic::from_bytes(truncated).is_err(), + "expected error for truncation at byte {i}" + ); + } + } + #[test] fn should_be_deserialized_from_bytes() { let stream_id = Identifier::numeric(1).unwrap(); diff --git a/core/common/src/commands/topics/update_topic.rs b/core/common/src/commands/topics/update_topic.rs index 83dadb69c..35fe67fce 100644 --- a/core/common/src/commands/topics/update_topic.rs +++ b/core/common/src/commands/topics/update_topic.rs @@ -128,28 +128,38 @@ impl BytesSerializable for UpdateTopic { position += stream_id.get_size_bytes().as_bytes_usize(); let topic_id = Identifier::from_bytes(bytes.slice(position..))?; position += topic_id.get_size_bytes().as_bytes_usize(); - let compression_algorithm = CompressionAlgorithm::from_code(bytes[position])?; + let compression_algorithm = CompressionAlgorithm::from_code( + *bytes.get(position).ok_or(IggyError::InvalidCommand)?, + )?; position += 1; let message_expiry = u64::from_le_bytes( - bytes[position..position + 8] + bytes + .get(position..position + 8) + .ok_or(IggyError::InvalidCommand)? .try_into() .map_err(|_| IggyError::InvalidNumberEncoding)?, ); let message_expiry: IggyExpiry = message_expiry.into(); let max_topic_size = u64::from_le_bytes( - bytes[position + 8..position + 16] + bytes + .get(position + 8..position + 16) + .ok_or(IggyError::InvalidCommand)? .try_into() .map_err(|_| IggyError::InvalidNumberEncoding)?, ); let max_topic_size: MaxTopicSize = max_topic_size.into(); - let replication_factor = match bytes[position + 16] { + let replication_factor = match *bytes.get(position + 16).ok_or(IggyError::InvalidCommand)? { 0 => None, factor => Some(factor), }; - let name_length = bytes[position + 17]; - let name = from_utf8(&bytes[position + 18..(position + 18 + name_length as usize)]) - .map_err(|_| IggyError::InvalidUtf8)? - .to_string(); + let name_length = *bytes.get(position + 17).ok_or(IggyError::InvalidCommand)?; + let name = from_utf8( + bytes + .get(position + 18..position + 18 + name_length as usize) + .ok_or(IggyError::InvalidCommand)?, + ) + .map_err(|_| IggyError::InvalidUtf8)? + .to_string(); if name.len() != name_length as usize { return Err(IggyError::InvalidCommand); } @@ -229,6 +239,32 @@ mod tests { assert_eq!(name, command.name); } + #[test] + fn from_bytes_should_fail_on_empty_input() { + assert!(UpdateTopic::from_bytes(Bytes::new()).is_err()); + } + + #[test] + fn from_bytes_should_fail_on_truncated_input() { + let command = UpdateTopic { + stream_id: Identifier::numeric(1).unwrap(), + topic_id: Identifier::numeric(2).unwrap(), + compression_algorithm: CompressionAlgorithm::None, + message_expiry: IggyExpiry::NeverExpire, + max_topic_size: MaxTopicSize::ServerDefault, + replication_factor: Some(1), + name: "test".to_string(), + }; + let bytes = command.to_bytes(); + for i in 0..bytes.len() - 1 { + let truncated = bytes.slice(..i); + assert!( + UpdateTopic::from_bytes(truncated).is_err(), + "expected error for truncation at byte {i}" + ); + } + } + #[test] fn should_be_deserialized_from_bytes() { let stream_id = Identifier::numeric(1).unwrap(); diff --git a/core/common/src/commands/users/change_password.rs b/core/common/src/commands/users/change_password.rs index 4a5663778..ba8e251db 100644 --- a/core/common/src/commands/users/change_password.rs +++ b/core/common/src/commands/users/change_password.rs @@ -101,18 +101,25 @@ impl BytesSerializable for ChangePassword { let user_id = Identifier::from_bytes(bytes.clone())?; let mut position = user_id.get_size_bytes().as_bytes_usize(); - let current_password_length = bytes[position]; + let current_password_length = *bytes.get(position).ok_or(IggyError::InvalidCommand)?; position += 1; - let current_password = - from_utf8(&bytes[position..position + current_password_length as usize]) - .map_err(|_| IggyError::InvalidUtf8)? - .to_string(); + let current_password = from_utf8( + bytes + .get(position..position + current_password_length as usize) + .ok_or(IggyError::InvalidCommand)?, + ) + .map_err(|_| IggyError::InvalidUtf8)? + .to_string(); position += current_password_length as usize; - let new_password_length = bytes[position]; + let new_password_length = *bytes.get(position).ok_or(IggyError::InvalidCommand)?; position += 1; - let new_password = from_utf8(&bytes[position..position + new_password_length as usize]) - .map_err(|_| IggyError::InvalidUtf8)? - .to_string(); + let new_password = from_utf8( + bytes + .get(position..position + new_password_length as usize) + .ok_or(IggyError::InvalidCommand)?, + ) + .map_err(|_| IggyError::InvalidUtf8)? + .to_string(); let command = ChangePassword { user_id, @@ -160,6 +167,24 @@ mod tests { assert_eq!(new_password, command.new_password); } + #[test] + fn from_bytes_should_fail_on_empty_input() { + assert!(ChangePassword::from_bytes(Bytes::new()).is_err()); + } + + #[test] + fn from_bytes_should_fail_on_truncated_input() { + let command = ChangePassword::default(); + let bytes = command.to_bytes(); + for i in 0..bytes.len() - 1 { + let truncated = bytes.slice(..i); + assert!( + ChangePassword::from_bytes(truncated).is_err(), + "expected error for truncation at byte {i}" + ); + } + } + #[test] fn should_be_deserialized_from_bytes() { let user_id = Identifier::numeric(1).unwrap(); diff --git a/core/common/src/commands/users/create_user.rs b/core/common/src/commands/users/create_user.rs index 7d23c54ee..b273a60a8 100644 --- a/core/common/src/commands/users/create_user.rs +++ b/core/common/src/commands/users/create_user.rs @@ -111,27 +111,35 @@ impl BytesSerializable for CreateUser { } let username_length = bytes[0]; - let username = from_utf8(&bytes[1..1 + username_length as usize]) - .map_err(|_| IggyError::InvalidUtf8)? - .to_string(); + let username = from_utf8( + bytes + .get(1..1 + username_length as usize) + .ok_or(IggyError::InvalidCommand)?, + ) + .map_err(|_| IggyError::InvalidUtf8)? + .to_string(); if username.len() != username_length as usize { return Err(IggyError::InvalidCommand); } let mut position = 1 + username_length as usize; - let password_length = bytes[position]; + let password_length = *bytes.get(position).ok_or(IggyError::InvalidCommand)?; position += 1; - let password = from_utf8(&bytes[position..position + password_length as usize]) - .map_err(|_| IggyError::InvalidUtf8)? - .to_string(); + let password = from_utf8( + bytes + .get(position..position + password_length as usize) + .ok_or(IggyError::InvalidCommand)?, + ) + .map_err(|_| IggyError::InvalidUtf8)? + .to_string(); if password.len() != password_length as usize { return Err(IggyError::InvalidCommand); } position += password_length as usize; - let status = UserStatus::from_code(bytes[position])?; + let status = UserStatus::from_code(*bytes.get(position).ok_or(IggyError::InvalidCommand)?)?; position += 1; - let has_permissions = bytes[position]; + let has_permissions = *bytes.get(position).ok_or(IggyError::InvalidCommand)?; if has_permissions > 1 { return Err(IggyError::InvalidCommand); } @@ -139,14 +147,18 @@ impl BytesSerializable for CreateUser { position += 1; let permissions = if has_permissions == 1 { let permissions_length = u32::from_le_bytes( - bytes[position..position + 4] + bytes + .get(position..position + 4) + .ok_or(IggyError::InvalidCommand)? .try_into() .map_err(|_| IggyError::InvalidNumberEncoding)?, ); position += 4; - Some(Permissions::from_bytes( - bytes.slice(position..position + permissions_length as usize), - )?) + let end = position + permissions_length as usize; + if end > bytes.len() { + return Err(IggyError::InvalidCommand); + } + Some(Permissions::from_bytes(bytes.slice(position..end))?) } else { None }; @@ -232,6 +244,32 @@ mod tests { assert_eq!(permissions, command.permissions.unwrap()); } + #[test] + fn from_bytes_should_fail_on_empty_input() { + assert!(CreateUser::from_bytes(Bytes::new()).is_err()); + } + + #[test] + fn from_bytes_should_fail_on_truncated_input() { + let command = CreateUser::default(); + let bytes = command.to_bytes(); + for i in 0..bytes.len() - 1 { + let truncated = bytes.slice(..i); + assert!( + CreateUser::from_bytes(truncated).is_err(), + "expected error for truncation at byte {i}" + ); + } + } + + #[test] + fn from_bytes_should_fail_on_corrupted_username_length() { + let mut buf = BytesMut::new(); + buf.put_u8(255); + buf.put_slice(b"short"); + assert!(CreateUser::from_bytes(buf.freeze()).is_err()); + } + #[test] fn should_be_deserialized_from_bytes() { let username = "user"; diff --git a/core/common/src/commands/users/login_user.rs b/core/common/src/commands/users/login_user.rs index edd1e8426..e0005e8b8 100644 --- a/core/common/src/commands/users/login_user.rs +++ b/core/common/src/commands/users/login_user.rs @@ -115,17 +115,23 @@ impl BytesSerializable for LoginUser { } let username_length = bytes[0]; - let username = from_utf8(&bytes[1..=(username_length as usize)]) - .map_err(|_| IggyError::InvalidUtf8)? - .to_string(); + let username = from_utf8( + bytes + .get(1..1 + username_length as usize) + .ok_or(IggyError::InvalidCommand)?, + ) + .map_err(|_| IggyError::InvalidUtf8)? + .to_string(); if username.len() != username_length as usize { return Err(IggyError::InvalidCommand); } - let password_length = bytes[1 + username_length as usize]; + let pos = 1 + username_length as usize; + let password_length = *bytes.get(pos).ok_or(IggyError::InvalidCommand)?; let password = from_utf8( - &bytes[2 + username_length as usize - ..2 + username_length as usize + password_length as usize], + bytes + .get(pos + 1..pos + 1 + password_length as usize) + .ok_or(IggyError::InvalidCommand)?, ) .map_err(|_| IggyError::InvalidUtf8)? .to_string(); @@ -133,37 +139,57 @@ impl BytesSerializable for LoginUser { return Err(IggyError::InvalidCommand); } - let position = 2 + username_length as usize + password_length as usize; - let version_length = u32::from_le_bytes( - bytes[position..position + 4] - .try_into() - .map_err(|_| IggyError::InvalidNumberEncoding)?, - ); - let version = match version_length { - 0 => None, - _ => { - let version = - from_utf8(&bytes[position + 4..position + 4 + version_length as usize]) - .map_err(|_| IggyError::InvalidUtf8)? - .to_string(); - Some(version) + let mut position = pos + 1 + password_length as usize; + + // Version and context fields are optional for backward compatibility + // with older SDKs (e.g. v0.8.0) that don't send them. + let version = if let Some(len_bytes) = bytes.get(position..position + 4) { + let version_length = u32::from_le_bytes( + len_bytes + .try_into() + .map_err(|_| IggyError::InvalidNumberEncoding)?, + ); + position += 4; + match version_length { + 0 => None, + _ => { + let version = from_utf8( + bytes + .get(position..position + version_length as usize) + .ok_or(IggyError::InvalidCommand)?, + ) + .map_err(|_| IggyError::InvalidUtf8)? + .to_string(); + position += version_length as usize; + Some(version) + } } + } else { + None }; - let position = position + 4 + version_length as usize; - let context_length = u32::from_le_bytes( - bytes[position..position + 4] - .try_into() - .map_err(|_| IggyError::InvalidNumberEncoding)?, - ); - let context = match context_length { - 0 => None, - _ => { - let context = - from_utf8(&bytes[position + 4..position + 4 + context_length as usize]) - .map_err(|_| IggyError::InvalidUtf8)? - .to_string(); - Some(context) + + let context = if let Some(len_bytes) = bytes.get(position..position + 4) { + let context_length = u32::from_le_bytes( + len_bytes + .try_into() + .map_err(|_| IggyError::InvalidNumberEncoding)?, + ); + position += 4; + match context_length { + 0 => None, + _ => { + let context = from_utf8( + bytes + .get(position..position + context_length as usize) + .ok_or(IggyError::InvalidCommand)?, + ) + .map_err(|_| IggyError::InvalidUtf8)? + .to_string(); + Some(context) + } } + } else { + None }; let command = LoginUser { @@ -226,6 +252,60 @@ mod tests { assert_eq!(context, command.context); } + #[test] + fn from_bytes_should_fail_on_empty_input() { + assert!(LoginUser::from_bytes(Bytes::new()).is_err()); + } + + #[test] + fn from_bytes_should_fail_on_truncated_input() { + let command = LoginUser { + username: "user".to_string(), + password: "secret".to_string(), + version: Some("1.0.0".to_string()), + context: Some("test".to_string()), + }; + let bytes = command.to_bytes(); + // Truncate at every position up to (but not including) the version field. + // Positions within username/password must error; positions at or past the + // version boundary are valid old-SDK payloads. + let version_offset = 2 + command.username.len() + command.password.len(); + for i in 0..version_offset { + let truncated = bytes.slice(..i); + assert!( + LoginUser::from_bytes(truncated).is_err(), + "expected error for truncation at byte {i}" + ); + } + } + + #[test] + fn from_bytes_should_fail_on_corrupted_username_length() { + let mut buf = BytesMut::new(); + buf.put_u8(255); // username_length = 255 + buf.put_slice(b"short"); + assert!(LoginUser::from_bytes(buf.freeze()).is_err()); + } + + #[test] + fn from_bytes_should_accept_old_sdk_format_without_version_context() { + let username = "user"; + let password = "secret"; + let mut bytes = BytesMut::new(); + #[allow(clippy::cast_possible_truncation)] + bytes.put_u8(username.len() as u8); + bytes.put_slice(username.as_bytes()); + #[allow(clippy::cast_possible_truncation)] + bytes.put_u8(password.len() as u8); + bytes.put_slice(password.as_bytes()); + + let command = LoginUser::from_bytes(bytes.freeze()).unwrap(); + assert_eq!(command.username, username); + assert_eq!(command.password, password); + assert_eq!(command.version, None); + assert_eq!(command.context, None); + } + #[test] fn should_be_deserialized_from_bytes() { let username = "user"; diff --git a/core/common/src/commands/users/update_user.rs b/core/common/src/commands/users/update_user.rs index 3e2811915..b6df319a7 100644 --- a/core/common/src/commands/users/update_user.rs +++ b/core/common/src/commands/users/update_user.rs @@ -96,32 +96,37 @@ impl BytesSerializable for UpdateUser { let user_id = Identifier::from_bytes(bytes.clone())?; let mut position = user_id.get_size_bytes().as_bytes_usize(); - let has_username = bytes[position]; + let has_username = *bytes.get(position).ok_or(IggyError::InvalidCommand)?; if has_username > 1 { return Err(IggyError::InvalidCommand); } position += 1; let username = if has_username == 1 { - let username_length = bytes[position]; + let username_length = *bytes.get(position).ok_or(IggyError::InvalidCommand)?; position += 1; - let username = from_utf8(&bytes[position..position + username_length as usize]) - .map_err(|_| IggyError::InvalidUtf8)? - .to_string(); + let username = from_utf8( + bytes + .get(position..position + username_length as usize) + .ok_or(IggyError::InvalidCommand)?, + ) + .map_err(|_| IggyError::InvalidUtf8)? + .to_string(); position += username_length as usize; Some(username) } else { None }; - let has_status = bytes[position]; + let has_status = *bytes.get(position).ok_or(IggyError::InvalidCommand)?; if has_status > 1 { return Err(IggyError::InvalidCommand); } let status = if has_status == 1 { position += 1; - let status = UserStatus::from_code(bytes[position])?; + let status = + UserStatus::from_code(*bytes.get(position).ok_or(IggyError::InvalidCommand)?)?; Some(status) } else { None @@ -180,6 +185,28 @@ mod tests { assert_eq!(status, command.status.unwrap()); } + #[test] + fn from_bytes_should_fail_on_empty_input() { + assert!(UpdateUser::from_bytes(Bytes::new()).is_err()); + } + + #[test] + fn from_bytes_should_fail_on_truncated_input() { + let command = UpdateUser { + user_id: Identifier::numeric(1).unwrap(), + username: Some("user".to_string()), + status: Some(UserStatus::Active), + }; + let bytes = command.to_bytes(); + for i in 0..bytes.len() - 1 { + let truncated = bytes.slice(..i); + assert!( + UpdateUser::from_bytes(truncated).is_err(), + "expected error for truncation at byte {i}" + ); + } + } + #[test] fn should_be_deserialized_from_bytes() { let user_id = Identifier::numeric(1).unwrap(); diff --git a/core/common/src/types/identifier/mod.rs b/core/common/src/types/identifier/mod.rs index bd4a5ccc0..9f17b357d 100644 --- a/core/common/src/types/identifier/mod.rs +++ b/core/common/src/types/identifier/mod.rs @@ -187,12 +187,12 @@ impl Identifier { /// Creates identifier from raw bytes pub fn from_raw_bytes(bytes: &[u8]) -> Result<Self, IggyError> { - let kind = IdKind::from_code(bytes[0])?; - let length = bytes[1]; - let value = bytes[2..2 + length as usize].to_vec(); - if value.len() != length as usize { - return Err(IggyError::InvalidIdentifier); - } + let kind = IdKind::from_code(*bytes.first().ok_or(IggyError::InvalidIdentifier)?)?; + let length = *bytes.get(1).ok_or(IggyError::InvalidIdentifier)?; + let value = bytes + .get(2..2 + length as usize) + .ok_or(IggyError::InvalidIdentifier)? + .to_vec(); let identifier = Identifier { kind, @@ -234,10 +234,10 @@ impl BytesSerializable for Identifier { let kind = IdKind::from_code(bytes[0])?; let length = bytes[1]; - let value = bytes[2..2 + length as usize].to_vec(); - if value.len() != length as usize { - return Err(IggyError::InvalidIdentifier); - } + let value = bytes + .get(2..2 + length as usize) + .ok_or(IggyError::InvalidIdentifier)? + .to_vec(); let identifier = Identifier { kind, @@ -382,6 +382,55 @@ mod tests { assert!(Identifier::named(&"a".repeat(256)).is_err()); } + #[test] + fn from_bytes_should_fail_on_empty_input() { + assert!(Identifier::from_bytes(Bytes::new()).is_err()); + } + + #[test] + fn from_bytes_should_fail_on_truncated_input() { + let id = Identifier::numeric(42).unwrap(); + let bytes = id.to_bytes(); + for i in 0..bytes.len() - 1 { + let truncated = bytes.slice(..i); + assert!( + Identifier::from_bytes(truncated).is_err(), + "expected error for truncation at byte {i}" + ); + } + } + + #[test] + fn from_bytes_should_fail_on_corrupted_length() { + let mut buf = BytesMut::new(); + buf.put_u8(1); // Numeric kind + buf.put_u8(255); // length = 255 but only 2 bytes of value follow + buf.put_u16_le(0); + assert!(Identifier::from_bytes(buf.freeze()).is_err()); + } + + #[test] + fn from_raw_bytes_should_fail_on_empty_input() { + assert!(Identifier::from_raw_bytes(&[]).is_err()); + } + + #[test] + fn from_raw_bytes_should_fail_on_truncated_input() { + let id = Identifier::numeric(42).unwrap(); + let bytes = id.to_bytes(); + for i in 0..bytes.len() - 1 { + assert!( + Identifier::from_raw_bytes(&bytes[..i]).is_err(), + "expected error for truncation at byte {i}" + ); + } + } + + #[test] + fn from_raw_bytes_should_fail_on_corrupted_length() { + assert!(Identifier::from_raw_bytes(&[1, 255, 0, 0]).is_err()); + } + #[test] fn numeric_id_should_be_converted_into_identifier_using_trait() { let id = 1; diff --git a/core/common/src/types/message/message_view.rs b/core/common/src/types/message/message_view.rs index 1f1442ea7..c9a7bb76f 100644 --- a/core/common/src/types/message/message_view.rs +++ b/core/common/src/types/message/message_view.rs @@ -108,7 +108,10 @@ impl<'a> IggyMessageView<'a> { pub fn payload(&self) -> &[u8] { let header_view = self.header(); let payload_len = header_view.payload_length(); - &self.buffer[self.payload_offset..self.payload_offset + payload_len] + let end = self.payload_offset + payload_len; + self.buffer + .get(self.payload_offset..end) + .unwrap_or_default() } /// Validates that the message view is properly formatted and has valid data. @@ -131,9 +134,13 @@ impl<'a> IggyMessageView<'a> { /// Validates that the message view has a valid checksum. /// This should be called only on server side. pub fn calculate_checksum(&self) -> u64 { - let checksum_field_size = size_of::<u64>(); // Skip checksum field for checksum calculation + let checksum_field_size = size_of::<u64>(); let size = self.size() - checksum_field_size; - let data = &self.buffer[checksum_field_size..checksum_field_size + size]; + let end = checksum_field_size + size; + let data = self + .buffer + .get(checksum_field_size..end) + .unwrap_or_default(); checksum::calculate_checksum(data) } } @@ -186,8 +193,17 @@ impl<'a> Iterator for IggyMessageViewIterator<'a> { } let remaining = &self.buffer[self.position..]; + if remaining.len() < IGGY_MESSAGE_HEADER_SIZE { + return None; + } + let view = IggyMessageView::new(remaining); - self.position += view.size(); + let size = view.size(); + if size > remaining.len() { + return None; + } + + self.position += size; Some(view) } } diff --git a/core/server/src/binary/handlers/messages/send_messages_handler.rs b/core/server/src/binary/handlers/messages/send_messages_handler.rs index 713d64ab2..477265b27 100644 --- a/core/server/src/binary/handlers/messages/send_messages_handler.rs +++ b/core/server/src/binary/handlers/messages/send_messages_handler.rs @@ -85,9 +85,11 @@ impl ServerCommandHandler for SendMessages { self.partitioning = partitioning; let messages_count = u32::from_le_bytes( - metadata_buf[element_size..element_size + 4] + metadata_buf + .get(element_size..element_size + 4) + .ok_or(IggyError::InvalidCommand)? .try_into() - .unwrap(), + .map_err(|_| IggyError::InvalidNumberEncoding)?, ); let indexes_size = messages_count as usize * INDEX_SIZE;
