This is an automated email from the ASF dual-hosted git repository.
piotr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new 8594d7d4c fix(security): harden all deserializers against malformed
packets (#2741)
8594d7d4c is described below
commit 8594d7d4cf391673bcd89e5b8959db19edb522e1
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Mon Feb 16 20:37:35 2026 +0100
fix(security): harden all deserializers against malformed packets (#2741)
A single truncated or corrupted network packet could panic the
server via unchecked slice indexing in from_bytes/from_raw_bytes
across 24 command deserializers. Since each IggyShard runs on a
single compio thread, one panic kills the entire shard, which in
turn would kill whole server.
Replace every direct bytes[n] and bytes[a..b] access with
checked .get().ok_or(IggyError::InvalidCommand) so malformed
input returns an error instead of crashing.
Besides that:
- IggyMessageView::new() now returns Result, validating full
message size upfront; payload()/calculate_checksum() no
longer use unwrap_or_default() on potentially corrupt data
- update_permissions had an .unwrap() on attacker-controlled
permissions_length - replaced with proper error propagation
- send_messages_handler message_size arithmetic could underflow
replaced with checked_sub() chain
- LoginUser now rejects 1-3 trailing bytes as corrupt instead
of silently ignoring incomplete length prefixes
- Removed dead name.len() != name_length guards that were
unreachable after .get() + from_utf8() validation
---
.../consumer_groups/create_consumer_group.rs | 30 +++-
.../consumer_offsets/delete_consumer_offset.rs | 10 +-
.../consumer_offsets/get_consumer_offset.rs | 10 +-
.../consumer_offsets/store_consumer_offset.rs | 14 +-
.../src/commands/messages/flush_unsaved_buffer.rs | 6 +-
core/common/src/commands/messages/poll_messages.rs | 24 ++-
.../src/commands/partitions/create_partitions.rs | 4 +-
.../src/commands/partitions/delete_partitions.rs | 4 +-
.../create_personal_access_token.rs | 23 +--
.../delete_personal_access_token.rs | 41 ++++-
.../login_with_personal_access_token.rs | 41 ++++-
.../src/commands/segments/delete_segments.rs | 8 +-
core/common/src/commands/streams/create_stream.rs | 41 ++++-
core/common/src/commands/streams/update_stream.rs | 15 +-
core/common/src/commands/topics/create_topic.rs | 59 +++++--
core/common/src/commands/topics/update_topic.rs | 55 ++++--
core/common/src/commands/users/change_password.rs | 43 ++++-
core/common/src/commands/users/create_user.rs | 78 ++++++---
core/common/src/commands/users/login_user.rs | 193 ++++++++++++++++-----
.../src/commands/users/update_permissions.rs | 21 ++-
core/common/src/commands/users/update_user.rs | 41 ++++-
core/common/src/types/identifier/mod.rs | 77 ++++++--
core/common/src/types/message/message_view.rs | 78 ++++-----
core/common/src/types/message/messages_batch.rs | 7 +-
.../common/src/types/message/messages_batch_mut.rs | 38 ++--
core/common/src/types/message/partitioning.rs | 147 +++++++++++++---
.../handlers/messages/send_messages_handler.rs | 47 +++--
core/server/src/http/http_shard_wrapper.rs | 5 +-
28 files changed, 868 insertions(+), 292 deletions(-)
diff --git a/core/common/src/commands/consumer_groups/create_consumer_group.rs
b/core/common/src/commands/consumer_groups/create_consumer_group.rs
index c03d39964..f697bc9c7 100644
--- a/core/common/src/commands/consumer_groups/create_consumer_group.rs
+++ b/core/common/src/commands/consumer_groups/create_consumer_group.rs
@@ -96,10 +96,14 @@ impl BytesSerializable for CreateConsumerGroup {
position += stream_id.get_size_bytes().as_bytes_usize();
let topic_id = Identifier::from_bytes(bytes.slice(position..))?;
position += topic_id.get_size_bytes().as_bytes_usize();
- let name_length = bytes[position];
- let name = from_utf8(&bytes[position + 1..position + 1 + name_length
as usize])
- .map_err(|_| IggyError::InvalidUtf8)?
- .to_string();
+ let name_length =
*bytes.get(position).ok_or(IggyError::InvalidCommand)?;
+ let name = from_utf8(
+ bytes
+ .get(position + 1..position + 1 + name_length as usize)
+ .ok_or(IggyError::InvalidCommand)?,
+ )
+ .map_err(|_| IggyError::InvalidUtf8)?
+ .to_string();
let command = CreateConsumerGroup {
stream_id,
topic_id,
@@ -142,6 +146,24 @@ mod tests {
assert_eq!(name, command.name);
}
+ #[test]
+ fn from_bytes_should_fail_on_empty_input() {
+ assert!(CreateConsumerGroup::from_bytes(Bytes::new()).is_err());
+ }
+
+ #[test]
+ fn from_bytes_should_fail_on_truncated_input() {
+ let command = CreateConsumerGroup::default();
+ let bytes = command.to_bytes();
+ for i in 0..bytes.len() - 1 {
+ let truncated = bytes.slice(..i);
+ assert!(
+ CreateConsumerGroup::from_bytes(truncated).is_err(),
+ "expected error for truncation at byte {i}"
+ );
+ }
+ }
+
#[test]
fn should_be_deserialized_from_bytes() {
let stream_id = Identifier::numeric(1).unwrap();
diff --git
a/core/common/src/commands/consumer_offsets/delete_consumer_offset.rs
b/core/common/src/commands/consumer_offsets/delete_consumer_offset.rs
index b608a8d55..b0fc12aa2 100644
--- a/core/common/src/commands/consumer_offsets/delete_consumer_offset.rs
+++ b/core/common/src/commands/consumer_offsets/delete_consumer_offset.rs
@@ -99,7 +99,8 @@ impl BytesSerializable for DeleteConsumerOffset {
}
let mut position = 0;
- let consumer_kind = ConsumerKind::from_code(bytes[0])?;
+ let consumer_kind =
+
ConsumerKind::from_code(*bytes.first().ok_or(IggyError::InvalidCommand)?)?;
let consumer_id = Identifier::from_bytes(bytes.slice(1..))?;
position += 1 + consumer_id.get_size_bytes().as_bytes_usize();
let consumer = Consumer {
@@ -110,10 +111,11 @@ impl BytesSerializable for DeleteConsumerOffset {
position += stream_id.get_size_bytes().as_bytes_usize();
let topic_id = Identifier::from_bytes(bytes.slice(position..))?;
position += topic_id.get_size_bytes().as_bytes_usize();
- // Decode partition_id with flag byte: 1 = Some, 0 = None
- let has_partition_id = bytes[position];
+ let has_partition_id =
*bytes.get(position).ok_or(IggyError::InvalidCommand)?;
let partition_id_value = u32::from_le_bytes(
- bytes[position + 1..position + 5]
+ bytes
+ .get(position + 1..position + 5)
+ .ok_or(IggyError::InvalidCommand)?
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
);
diff --git a/core/common/src/commands/consumer_offsets/get_consumer_offset.rs
b/core/common/src/commands/consumer_offsets/get_consumer_offset.rs
index 0f542e09f..f27624948 100644
--- a/core/common/src/commands/consumer_offsets/get_consumer_offset.rs
+++ b/core/common/src/commands/consumer_offsets/get_consumer_offset.rs
@@ -104,7 +104,8 @@ impl BytesSerializable for GetConsumerOffset {
}
let mut position = 0;
- let consumer_kind = ConsumerKind::from_code(bytes[0])?;
+ let consumer_kind =
+
ConsumerKind::from_code(*bytes.first().ok_or(IggyError::InvalidCommand)?)?;
let consumer_id = Identifier::from_bytes(bytes.slice(1..))?;
position += 1 + consumer_id.get_size_bytes().as_bytes_usize();
let consumer = Consumer {
@@ -115,10 +116,11 @@ impl BytesSerializable for GetConsumerOffset {
position += stream_id.get_size_bytes().as_bytes_usize();
let topic_id = Identifier::from_bytes(bytes.slice(position..))?;
position += topic_id.get_size_bytes().as_bytes_usize();
- // Decode partition_id with flag byte: 1 = Some, 0 = None
- let has_partition_id = bytes[position];
+ let has_partition_id =
*bytes.get(position).ok_or(IggyError::InvalidCommand)?;
let partition_id_value = u32::from_le_bytes(
- bytes[position + 1..position + 5]
+ bytes
+ .get(position + 1..position + 5)
+ .ok_or(IggyError::InvalidCommand)?
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
);
diff --git a/core/common/src/commands/consumer_offsets/store_consumer_offset.rs
b/core/common/src/commands/consumer_offsets/store_consumer_offset.rs
index c7705fbfb..d8512df75 100644
--- a/core/common/src/commands/consumer_offsets/store_consumer_offset.rs
+++ b/core/common/src/commands/consumer_offsets/store_consumer_offset.rs
@@ -104,7 +104,8 @@ impl BytesSerializable for StoreConsumerOffset {
}
let mut position = 0;
- let consumer_kind = ConsumerKind::from_code(bytes[0])?;
+ let consumer_kind =
+
ConsumerKind::from_code(*bytes.first().ok_or(IggyError::InvalidCommand)?)?;
let consumer_id = Identifier::from_bytes(bytes.slice(1..))?;
position += 1 + consumer_id.get_size_bytes().as_bytes_usize();
let consumer = Consumer {
@@ -115,10 +116,11 @@ impl BytesSerializable for StoreConsumerOffset {
position += stream_id.get_size_bytes().as_bytes_usize();
let topic_id = Identifier::from_bytes(bytes.slice(position..))?;
position += topic_id.get_size_bytes().as_bytes_usize();
- // Decode partition_id with flag byte: 1 = Some, 0 = None
- let has_partition_id = bytes[position];
+ let has_partition_id =
*bytes.get(position).ok_or(IggyError::InvalidCommand)?;
let partition_id_value = u32::from_le_bytes(
- bytes[position + 1..position + 5]
+ bytes
+ .get(position + 1..position + 5)
+ .ok_or(IggyError::InvalidCommand)?
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
);
@@ -128,7 +130,9 @@ impl BytesSerializable for StoreConsumerOffset {
None
};
let offset = u64::from_le_bytes(
- bytes[position + 5..position + 13]
+ bytes
+ .get(position + 5..position + 13)
+ .ok_or(IggyError::InvalidCommand)?
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
);
diff --git a/core/common/src/commands/messages/flush_unsaved_buffer.rs
b/core/common/src/commands/messages/flush_unsaved_buffer.rs
index beb26e5e9..2235a911b 100644
--- a/core/common/src/commands/messages/flush_unsaved_buffer.rs
+++ b/core/common/src/commands/messages/flush_unsaved_buffer.rs
@@ -86,12 +86,14 @@ impl BytesSerializable for FlushUnsavedBuffer {
let topic_id = Identifier::from_bytes(bytes.slice(position..))?;
position += topic_id.to_bytes().len();
let partition_id = u32::from_le_bytes(
- bytes[position..position + 4]
+ bytes
+ .get(position..position + 4)
+ .ok_or(IggyError::InvalidCommand)?
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
);
position += 4;
- let fsync = bytes[position] == 1;
+ let fsync = *bytes.get(position).ok_or(IggyError::InvalidCommand)? ==
1;
Ok(FlushUnsavedBuffer {
stream_id,
topic_id,
diff --git a/core/common/src/commands/messages/poll_messages.rs
b/core/common/src/commands/messages/poll_messages.rs
index c52b335c8..ac266ca4c 100644
--- a/core/common/src/commands/messages/poll_messages.rs
+++ b/core/common/src/commands/messages/poll_messages.rs
@@ -157,7 +157,8 @@ impl BytesSerializable for PollMessages {
}
let mut position = 0;
- let consumer_kind = ConsumerKind::from_code(bytes[0])?;
+ let consumer_kind =
+
ConsumerKind::from_code(*bytes.first().ok_or(IggyError::InvalidCommand)?)?;
let consumer_id = Identifier::from_bytes(bytes.slice(1..))?;
position += 1 + consumer_id.get_size_bytes().as_bytes_usize();
let consumer = Consumer {
@@ -168,10 +169,11 @@ impl BytesSerializable for PollMessages {
position += stream_id.get_size_bytes().as_bytes_usize();
let topic_id = Identifier::from_bytes(bytes.slice(position..))?;
position += topic_id.get_size_bytes().as_bytes_usize();
- // Decode partition_id with flag byte: 1 = Some, 0 = None
- let has_partition_id = bytes[position];
+ let has_partition_id =
*bytes.get(position).ok_or(IggyError::InvalidCommand)?;
let partition_id_value = u32::from_le_bytes(
- bytes[position + 1..position + 5]
+ bytes
+ .get(position + 1..position + 5)
+ .ok_or(IggyError::InvalidCommand)?
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
);
@@ -180,10 +182,13 @@ impl BytesSerializable for PollMessages {
} else {
None
};
- let polling_kind = PollingKind::from_code(bytes[position + 5])?;
+ let polling_kind =
+ PollingKind::from_code(*bytes.get(position +
5).ok_or(IggyError::InvalidCommand)?)?;
position += 6;
let value = u64::from_le_bytes(
- bytes[position..position + 8]
+ bytes
+ .get(position..position + 8)
+ .ok_or(IggyError::InvalidCommand)?
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
);
@@ -192,12 +197,13 @@ impl BytesSerializable for PollMessages {
value,
};
let count = u32::from_le_bytes(
- bytes[position + 8..position + 12]
+ bytes
+ .get(position + 8..position + 12)
+ .ok_or(IggyError::InvalidCommand)?
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
);
- let auto_commit = bytes[position + 12];
- let auto_commit = matches!(auto_commit, 1);
+ let auto_commit = *bytes.get(position +
12).ok_or(IggyError::InvalidCommand)? == 1;
let command = PollMessages {
consumer,
stream_id,
diff --git a/core/common/src/commands/partitions/create_partitions.rs
b/core/common/src/commands/partitions/create_partitions.rs
index a01d3b26d..2bd9ee454 100644
--- a/core/common/src/commands/partitions/create_partitions.rs
+++ b/core/common/src/commands/partitions/create_partitions.rs
@@ -92,7 +92,9 @@ impl BytesSerializable for CreatePartitions {
let topic_id = Identifier::from_bytes(bytes.slice(position..))?;
position += topic_id.get_size_bytes().as_bytes_usize();
let partitions_count = u32::from_le_bytes(
- bytes[position..position + 4]
+ bytes
+ .get(position..position + 4)
+ .ok_or(IggyError::InvalidCommand)?
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
);
diff --git a/core/common/src/commands/partitions/delete_partitions.rs
b/core/common/src/commands/partitions/delete_partitions.rs
index 12be4448a..9e0cf1ddc 100644
--- a/core/common/src/commands/partitions/delete_partitions.rs
+++ b/core/common/src/commands/partitions/delete_partitions.rs
@@ -92,7 +92,9 @@ impl BytesSerializable for DeletePartitions {
let topic_id = Identifier::from_bytes(bytes.slice(position..))?;
position += topic_id.get_size_bytes().as_bytes_usize();
let partitions_count = u32::from_le_bytes(
- bytes[position..position + 4]
+ bytes
+ .get(position..position + 4)
+ .ok_or(IggyError::InvalidCommand)?
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
);
diff --git
a/core/common/src/commands/personal_access_tokens/create_personal_access_token.rs
b/core/common/src/commands/personal_access_tokens/create_personal_access_token.rs
index fa457a33e..395822854 100644
---
a/core/common/src/commands/personal_access_tokens/create_personal_access_token.rs
+++
b/core/common/src/commands/personal_access_tokens/create_personal_access_token.rs
@@ -82,17 +82,20 @@ impl BytesSerializable for CreatePersonalAccessToken {
return Err(IggyError::InvalidCommand);
}
- let name_length = bytes[0];
- let name = from_utf8(&bytes.slice(1..1 + name_length as usize))
- .map_err(|_| IggyError::InvalidUtf8)?
- .to_string();
- if name.len() != name_length as usize {
- return Err(IggyError::InvalidCommand);
- }
-
- let position = 1 + name_length as usize;
+ let name_length = *bytes.first().ok_or(IggyError::InvalidCommand)? as
usize;
+ let name = from_utf8(
+ bytes
+ .get(1..1 + name_length)
+ .ok_or(IggyError::InvalidCommand)?,
+ )
+ .map_err(|_| IggyError::InvalidUtf8)?
+ .to_string();
+
+ let position = 1 + name_length;
let expiry = u64::from_le_bytes(
- bytes[position..position + 8]
+ bytes
+ .get(position..position + 8)
+ .ok_or(IggyError::InvalidCommand)?
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
);
diff --git
a/core/common/src/commands/personal_access_tokens/delete_personal_access_token.rs
b/core/common/src/commands/personal_access_tokens/delete_personal_access_token.rs
index 064f78c48..0d19b68c9 100644
---
a/core/common/src/commands/personal_access_tokens/delete_personal_access_token.rs
+++
b/core/common/src/commands/personal_access_tokens/delete_personal_access_token.rs
@@ -76,13 +76,14 @@ impl BytesSerializable for DeletePersonalAccessToken {
return Err(IggyError::InvalidCommand);
}
- let name_length = bytes[0];
- let name = from_utf8(&bytes[1..1 + name_length as usize])
- .map_err(|_| IggyError::InvalidUtf8)?
- .to_string();
- if name.len() != name_length as usize {
- return Err(IggyError::InvalidCommand);
- }
+ let name_length = *bytes.first().ok_or(IggyError::InvalidCommand)? as
usize;
+ let name = from_utf8(
+ bytes
+ .get(1..1 + name_length)
+ .ok_or(IggyError::InvalidCommand)?,
+ )
+ .map_err(|_| IggyError::InvalidUtf8)?
+ .to_string();
let command = DeletePersonalAccessToken { name };
Ok(command)
@@ -112,6 +113,32 @@ mod tests {
assert_eq!(name, command.name);
}
+ #[test]
+ fn from_bytes_should_fail_on_empty_input() {
+ assert!(DeletePersonalAccessToken::from_bytes(Bytes::new()).is_err());
+ }
+
+ #[test]
+ fn from_bytes_should_fail_on_truncated_input() {
+ let command = DeletePersonalAccessToken::default();
+ let bytes = command.to_bytes();
+ for i in 0..bytes.len() - 1 {
+ let truncated = bytes.slice(..i);
+ assert!(
+ DeletePersonalAccessToken::from_bytes(truncated).is_err(),
+ "expected error for truncation at byte {i}"
+ );
+ }
+ }
+
+ #[test]
+ fn from_bytes_should_fail_on_corrupted_name_length() {
+ let mut buf = BytesMut::new();
+ buf.put_u8(255);
+ buf.put_slice(b"short");
+ assert!(DeletePersonalAccessToken::from_bytes(buf.freeze()).is_err());
+ }
+
#[test]
fn should_be_deserialized_from_bytes() {
let name = "test";
diff --git
a/core/common/src/commands/personal_access_tokens/login_with_personal_access_token.rs
b/core/common/src/commands/personal_access_tokens/login_with_personal_access_token.rs
index cdba04e37..59fafa9c1 100644
---
a/core/common/src/commands/personal_access_tokens/login_with_personal_access_token.rs
+++
b/core/common/src/commands/personal_access_tokens/login_with_personal_access_token.rs
@@ -73,13 +73,14 @@ impl BytesSerializable for LoginWithPersonalAccessToken {
return Err(IggyError::InvalidCommand);
}
- let token_length = bytes[0];
- let token = from_utf8(&bytes[1..1 + token_length as usize])
- .map_err(|_| IggyError::InvalidUtf8)?
- .to_string();
- if token.len() != token_length as usize {
- return Err(IggyError::InvalidCommand);
- }
+ let token_length = *bytes.first().ok_or(IggyError::InvalidCommand)? as
usize;
+ let token = from_utf8(
+ bytes
+ .get(1..1 + token_length)
+ .ok_or(IggyError::InvalidCommand)?,
+ )
+ .map_err(|_| IggyError::InvalidUtf8)?
+ .to_string();
let command = LoginWithPersonalAccessToken { token };
Ok(command)
@@ -109,6 +110,32 @@ mod tests {
assert_eq!(token, command.token);
}
+ #[test]
+ fn from_bytes_should_fail_on_empty_input() {
+
assert!(LoginWithPersonalAccessToken::from_bytes(Bytes::new()).is_err());
+ }
+
+ #[test]
+ fn from_bytes_should_fail_on_truncated_input() {
+ let command = LoginWithPersonalAccessToken::default();
+ let bytes = command.to_bytes();
+ for i in 0..bytes.len() - 1 {
+ let truncated = bytes.slice(..i);
+ assert!(
+ LoginWithPersonalAccessToken::from_bytes(truncated).is_err(),
+ "expected error for truncation at byte {i}"
+ );
+ }
+ }
+
+ #[test]
+ fn from_bytes_should_fail_on_corrupted_token_length() {
+ let mut buf = BytesMut::new();
+ buf.put_u8(255);
+ buf.put_slice(b"short");
+
assert!(LoginWithPersonalAccessToken::from_bytes(buf.freeze()).is_err());
+ }
+
#[test]
fn should_be_deserialized_from_bytes() {
let token = "test";
diff --git a/core/common/src/commands/segments/delete_segments.rs
b/core/common/src/commands/segments/delete_segments.rs
index 841a84ac7..c57833f4c 100644
--- a/core/common/src/commands/segments/delete_segments.rs
+++ b/core/common/src/commands/segments/delete_segments.rs
@@ -97,13 +97,17 @@ impl BytesSerializable for DeleteSegments {
let topic_id = Identifier::from_bytes(bytes.slice(position..))?;
position += topic_id.get_size_bytes().as_bytes_usize();
let partition_id = u32::from_le_bytes(
- bytes[position..position + std::mem::size_of::<u32>()]
+ bytes
+ .get(position..position + std::mem::size_of::<u32>())
+ .ok_or(IggyError::InvalidCommand)?
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
);
position += std::mem::size_of::<u32>();
let segments_count = u32::from_le_bytes(
- bytes[position..position + std::mem::size_of::<u32>()]
+ bytes
+ .get(position..position + std::mem::size_of::<u32>())
+ .ok_or(IggyError::InvalidCommand)?
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
);
diff --git a/core/common/src/commands/streams/create_stream.rs
b/core/common/src/commands/streams/create_stream.rs
index 9ff2fec5a..d5964eb5e 100644
--- a/core/common/src/commands/streams/create_stream.rs
+++ b/core/common/src/commands/streams/create_stream.rs
@@ -73,13 +73,14 @@ impl BytesSerializable for CreateStream {
return Err(IggyError::InvalidCommand);
}
- let name_length = bytes[0];
- let name = from_utf8(&bytes[1..1 + name_length as usize])
- .map_err(|_| IggyError::InvalidUtf8)?
- .to_string();
- if name.len() != name_length as usize {
- return Err(IggyError::InvalidCommand);
- }
+ let name_length = *bytes.first().ok_or(IggyError::InvalidCommand)? as
usize;
+ let name = from_utf8(
+ bytes
+ .get(1..1 + name_length)
+ .ok_or(IggyError::InvalidCommand)?,
+ )
+ .map_err(|_| IggyError::InvalidUtf8)?
+ .to_string();
let command = CreateStream { name };
Ok(command)
@@ -110,6 +111,32 @@ mod tests {
assert_eq!(name, command.name);
}
+ #[test]
+ fn from_bytes_should_fail_on_empty_input() {
+ assert!(CreateStream::from_bytes(Bytes::new()).is_err());
+ }
+
+ #[test]
+ fn from_bytes_should_fail_on_truncated_input() {
+ let command = CreateStream::default();
+ let bytes = command.to_bytes();
+ for i in 0..bytes.len() - 1 {
+ let truncated = bytes.slice(..i);
+ assert!(
+ CreateStream::from_bytes(truncated).is_err(),
+ "expected error for truncation at byte {i}"
+ );
+ }
+ }
+
+ #[test]
+ fn from_bytes_should_fail_on_corrupted_name_length() {
+ let mut buf = BytesMut::new();
+ buf.put_u8(255);
+ buf.put_slice(b"short");
+ assert!(CreateStream::from_bytes(buf.freeze()).is_err());
+ }
+
#[test]
fn should_be_deserialized_from_bytes() {
let name = "test".to_string();
diff --git a/core/common/src/commands/streams/update_stream.rs
b/core/common/src/commands/streams/update_stream.rs
index ce3600f88..a202105bf 100644
--- a/core/common/src/commands/streams/update_stream.rs
+++ b/core/common/src/commands/streams/update_stream.rs
@@ -85,13 +85,14 @@ impl BytesSerializable for UpdateStream {
let mut position = 0;
let stream_id = Identifier::from_bytes(bytes.clone())?;
position += stream_id.get_size_bytes().as_bytes_usize();
- let name_length = bytes[position];
- let name = from_utf8(&bytes[position + 1..position + 1 + name_length
as usize])
- .map_err(|_| IggyError::InvalidUtf8)?
- .to_string();
- if name.len() != name_length as usize {
- return Err(IggyError::InvalidCommand);
- }
+ let name_length =
*bytes.get(position).ok_or(IggyError::InvalidCommand)? as usize;
+ let name = from_utf8(
+ bytes
+ .get(position + 1..position + 1 + name_length)
+ .ok_or(IggyError::InvalidCommand)?,
+ )
+ .map_err(|_| IggyError::InvalidUtf8)?
+ .to_string();
let command = UpdateStream { stream_id, name };
Ok(command)
diff --git a/core/common/src/commands/topics/create_topic.rs
b/core/common/src/commands/topics/create_topic.rs
index 48cd3d525..bd4d87f48 100644
--- a/core/common/src/commands/topics/create_topic.rs
+++ b/core/common/src/commands/topics/create_topic.rs
@@ -127,34 +127,43 @@ impl BytesSerializable for CreateTopic {
let stream_id = Identifier::from_bytes(bytes.clone())?;
position += stream_id.get_size_bytes().as_bytes_usize();
let partitions_count = u32::from_le_bytes(
- bytes[position..position + 4]
+ bytes
+ .get(position..position + 4)
+ .ok_or(IggyError::InvalidCommand)?
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
);
- let compression_algorithm =
CompressionAlgorithm::from_code(bytes[position + 4])?;
+ let compression_algorithm = CompressionAlgorithm::from_code(
+ *bytes.get(position + 4).ok_or(IggyError::InvalidCommand)?,
+ )?;
let message_expiry = u64::from_le_bytes(
- bytes[position + 5..position + 13]
+ bytes
+ .get(position + 5..position + 13)
+ .ok_or(IggyError::InvalidCommand)?
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
);
let message_expiry: IggyExpiry = message_expiry.into();
let max_topic_size = u64::from_le_bytes(
- bytes[position + 13..position + 21]
+ bytes
+ .get(position + 13..position + 21)
+ .ok_or(IggyError::InvalidCommand)?
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
);
let max_topic_size: MaxTopicSize = max_topic_size.into();
- let replication_factor = match bytes[position + 21] {
+ let replication_factor = match *bytes.get(position +
21).ok_or(IggyError::InvalidCommand)? {
0 => None,
factor => Some(factor),
};
- let name_length = bytes[position + 22];
- let name = from_utf8(&bytes[position + 23..(position + 23 +
name_length as usize)])
- .map_err(|_| IggyError::InvalidUtf8)?
- .to_string();
- if name.len() != name_length as usize {
- return Err(IggyError::InvalidCommand);
- }
+ let name_length = *bytes.get(position +
22).ok_or(IggyError::InvalidCommand)? as usize;
+ let name = from_utf8(
+ bytes
+ .get(position + 23..position + 23 + name_length)
+ .ok_or(IggyError::InvalidCommand)?,
+ )
+ .map_err(|_| IggyError::InvalidUtf8)?
+ .to_string();
let command = CreateTopic {
stream_id,
partitions_count,
@@ -229,6 +238,32 @@ mod tests {
assert_eq!(name, command.name);
}
+ #[test]
+ fn from_bytes_should_fail_on_empty_input() {
+ assert!(CreateTopic::from_bytes(Bytes::new()).is_err());
+ }
+
+ #[test]
+ fn from_bytes_should_fail_on_truncated_input() {
+ let command = CreateTopic {
+ stream_id: Identifier::numeric(1).unwrap(),
+ partitions_count: 3,
+ compression_algorithm: CompressionAlgorithm::None,
+ message_expiry: IggyExpiry::NeverExpire,
+ max_topic_size: MaxTopicSize::ServerDefault,
+ replication_factor: Some(1),
+ name: "test".to_string(),
+ };
+ let bytes = command.to_bytes();
+ for i in 0..bytes.len() - 1 {
+ let truncated = bytes.slice(..i);
+ assert!(
+ CreateTopic::from_bytes(truncated).is_err(),
+ "expected error for truncation at byte {i}"
+ );
+ }
+ }
+
#[test]
fn should_be_deserialized_from_bytes() {
let stream_id = Identifier::numeric(1).unwrap();
diff --git a/core/common/src/commands/topics/update_topic.rs
b/core/common/src/commands/topics/update_topic.rs
index 83dadb69c..6f7bf5a02 100644
--- a/core/common/src/commands/topics/update_topic.rs
+++ b/core/common/src/commands/topics/update_topic.rs
@@ -128,31 +128,38 @@ impl BytesSerializable for UpdateTopic {
position += stream_id.get_size_bytes().as_bytes_usize();
let topic_id = Identifier::from_bytes(bytes.slice(position..))?;
position += topic_id.get_size_bytes().as_bytes_usize();
- let compression_algorithm =
CompressionAlgorithm::from_code(bytes[position])?;
+ let compression_algorithm = CompressionAlgorithm::from_code(
+ *bytes.get(position).ok_or(IggyError::InvalidCommand)?,
+ )?;
position += 1;
let message_expiry = u64::from_le_bytes(
- bytes[position..position + 8]
+ bytes
+ .get(position..position + 8)
+ .ok_or(IggyError::InvalidCommand)?
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
);
let message_expiry: IggyExpiry = message_expiry.into();
let max_topic_size = u64::from_le_bytes(
- bytes[position + 8..position + 16]
+ bytes
+ .get(position + 8..position + 16)
+ .ok_or(IggyError::InvalidCommand)?
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
);
let max_topic_size: MaxTopicSize = max_topic_size.into();
- let replication_factor = match bytes[position + 16] {
+ let replication_factor = match *bytes.get(position +
16).ok_or(IggyError::InvalidCommand)? {
0 => None,
factor => Some(factor),
};
- let name_length = bytes[position + 17];
- let name = from_utf8(&bytes[position + 18..(position + 18 +
name_length as usize)])
- .map_err(|_| IggyError::InvalidUtf8)?
- .to_string();
- if name.len() != name_length as usize {
- return Err(IggyError::InvalidCommand);
- }
+ let name_length = *bytes.get(position +
17).ok_or(IggyError::InvalidCommand)? as usize;
+ let name = from_utf8(
+ bytes
+ .get(position + 18..position + 18 + name_length)
+ .ok_or(IggyError::InvalidCommand)?,
+ )
+ .map_err(|_| IggyError::InvalidUtf8)?
+ .to_string();
let command = UpdateTopic {
stream_id,
topic_id,
@@ -229,6 +236,32 @@ mod tests {
assert_eq!(name, command.name);
}
+ #[test]
+ fn from_bytes_should_fail_on_empty_input() {
+ assert!(UpdateTopic::from_bytes(Bytes::new()).is_err());
+ }
+
+ #[test]
+ fn from_bytes_should_fail_on_truncated_input() {
+ let command = UpdateTopic {
+ stream_id: Identifier::numeric(1).unwrap(),
+ topic_id: Identifier::numeric(2).unwrap(),
+ compression_algorithm: CompressionAlgorithm::None,
+ message_expiry: IggyExpiry::NeverExpire,
+ max_topic_size: MaxTopicSize::ServerDefault,
+ replication_factor: Some(1),
+ name: "test".to_string(),
+ };
+ let bytes = command.to_bytes();
+ for i in 0..bytes.len() - 1 {
+ let truncated = bytes.slice(..i);
+ assert!(
+ UpdateTopic::from_bytes(truncated).is_err(),
+ "expected error for truncation at byte {i}"
+ );
+ }
+ }
+
#[test]
fn should_be_deserialized_from_bytes() {
let stream_id = Identifier::numeric(1).unwrap();
diff --git a/core/common/src/commands/users/change_password.rs
b/core/common/src/commands/users/change_password.rs
index 4a5663778..ba8e251db 100644
--- a/core/common/src/commands/users/change_password.rs
+++ b/core/common/src/commands/users/change_password.rs
@@ -101,18 +101,25 @@ impl BytesSerializable for ChangePassword {
let user_id = Identifier::from_bytes(bytes.clone())?;
let mut position = user_id.get_size_bytes().as_bytes_usize();
- let current_password_length = bytes[position];
+ let current_password_length =
*bytes.get(position).ok_or(IggyError::InvalidCommand)?;
position += 1;
- let current_password =
- from_utf8(&bytes[position..position + current_password_length as
usize])
- .map_err(|_| IggyError::InvalidUtf8)?
- .to_string();
+ let current_password = from_utf8(
+ bytes
+ .get(position..position + current_password_length as usize)
+ .ok_or(IggyError::InvalidCommand)?,
+ )
+ .map_err(|_| IggyError::InvalidUtf8)?
+ .to_string();
position += current_password_length as usize;
- let new_password_length = bytes[position];
+ let new_password_length =
*bytes.get(position).ok_or(IggyError::InvalidCommand)?;
position += 1;
- let new_password = from_utf8(&bytes[position..position +
new_password_length as usize])
- .map_err(|_| IggyError::InvalidUtf8)?
- .to_string();
+ let new_password = from_utf8(
+ bytes
+ .get(position..position + new_password_length as usize)
+ .ok_or(IggyError::InvalidCommand)?,
+ )
+ .map_err(|_| IggyError::InvalidUtf8)?
+ .to_string();
let command = ChangePassword {
user_id,
@@ -160,6 +167,24 @@ mod tests {
assert_eq!(new_password, command.new_password);
}
+ #[test]
+ fn from_bytes_should_fail_on_empty_input() {
+ assert!(ChangePassword::from_bytes(Bytes::new()).is_err());
+ }
+
+ #[test]
+ fn from_bytes_should_fail_on_truncated_input() {
+ let command = ChangePassword::default();
+ let bytes = command.to_bytes();
+ for i in 0..bytes.len() - 1 {
+ let truncated = bytes.slice(..i);
+ assert!(
+ ChangePassword::from_bytes(truncated).is_err(),
+ "expected error for truncation at byte {i}"
+ );
+ }
+ }
+
#[test]
fn should_be_deserialized_from_bytes() {
let user_id = Identifier::numeric(1).unwrap();
diff --git a/core/common/src/commands/users/create_user.rs
b/core/common/src/commands/users/create_user.rs
index 7d23c54ee..6aa510ad6 100644
--- a/core/common/src/commands/users/create_user.rs
+++ b/core/common/src/commands/users/create_user.rs
@@ -110,28 +110,30 @@ impl BytesSerializable for CreateUser {
return Err(IggyError::InvalidCommand);
}
- let username_length = bytes[0];
- let username = from_utf8(&bytes[1..1 + username_length as usize])
- .map_err(|_| IggyError::InvalidUtf8)?
- .to_string();
- if username.len() != username_length as usize {
- return Err(IggyError::InvalidCommand);
- }
+ let username_length = *bytes.first().ok_or(IggyError::InvalidCommand)?
as usize;
+ let username = from_utf8(
+ bytes
+ .get(1..1 + username_length)
+ .ok_or(IggyError::InvalidCommand)?,
+ )
+ .map_err(|_| IggyError::InvalidUtf8)?
+ .to_string();
- let mut position = 1 + username_length as usize;
- let password_length = bytes[position];
+ let mut position = 1 + username_length;
+ let password_length =
*bytes.get(position).ok_or(IggyError::InvalidCommand)? as usize;
position += 1;
- let password = from_utf8(&bytes[position..position + password_length
as usize])
- .map_err(|_| IggyError::InvalidUtf8)?
- .to_string();
- if password.len() != password_length as usize {
- return Err(IggyError::InvalidCommand);
- }
+ let password = from_utf8(
+ bytes
+ .get(position..position + password_length)
+ .ok_or(IggyError::InvalidCommand)?,
+ )
+ .map_err(|_| IggyError::InvalidUtf8)?
+ .to_string();
- position += password_length as usize;
- let status = UserStatus::from_code(bytes[position])?;
+ position += password_length;
+ let status =
UserStatus::from_code(*bytes.get(position).ok_or(IggyError::InvalidCommand)?)?;
position += 1;
- let has_permissions = bytes[position];
+ let has_permissions =
*bytes.get(position).ok_or(IggyError::InvalidCommand)?;
if has_permissions > 1 {
return Err(IggyError::InvalidCommand);
}
@@ -139,14 +141,20 @@ impl BytesSerializable for CreateUser {
position += 1;
let permissions = if has_permissions == 1 {
let permissions_length = u32::from_le_bytes(
- bytes[position..position + 4]
+ bytes
+ .get(position..position + 4)
+ .ok_or(IggyError::InvalidCommand)?
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
);
position += 4;
- Some(Permissions::from_bytes(
- bytes.slice(position..position + permissions_length as usize),
- )?)
+ let end = position
+ .checked_add(permissions_length as usize)
+ .ok_or(IggyError::InvalidCommand)?;
+ if end > bytes.len() {
+ return Err(IggyError::InvalidCommand);
+ }
+ Some(Permissions::from_bytes(bytes.slice(position..end))?)
} else {
None
};
@@ -232,6 +240,32 @@ mod tests {
assert_eq!(permissions, command.permissions.unwrap());
}
+ #[test]
+ fn from_bytes_should_fail_on_empty_input() {
+ assert!(CreateUser::from_bytes(Bytes::new()).is_err());
+ }
+
+ #[test]
+ fn from_bytes_should_fail_on_truncated_input() {
+ let command = CreateUser::default();
+ let bytes = command.to_bytes();
+ for i in 0..bytes.len() - 1 {
+ let truncated = bytes.slice(..i);
+ assert!(
+ CreateUser::from_bytes(truncated).is_err(),
+ "expected error for truncation at byte {i}"
+ );
+ }
+ }
+
+ #[test]
+ fn from_bytes_should_fail_on_corrupted_username_length() {
+ let mut buf = BytesMut::new();
+ buf.put_u8(255);
+ buf.put_slice(b"short");
+ assert!(CreateUser::from_bytes(buf.freeze()).is_err());
+ }
+
#[test]
fn should_be_deserialized_from_bytes() {
let username = "user";
diff --git a/core/common/src/commands/users/login_user.rs
b/core/common/src/commands/users/login_user.rs
index edd1e8426..9fe907f11 100644
--- a/core/common/src/commands/users/login_user.rs
+++ b/core/common/src/commands/users/login_user.rs
@@ -114,55 +114,87 @@ impl BytesSerializable for LoginUser {
return Err(IggyError::InvalidCommand);
}
- let username_length = bytes[0];
- let username = from_utf8(&bytes[1..=(username_length as usize)])
- .map_err(|_| IggyError::InvalidUtf8)?
- .to_string();
- if username.len() != username_length as usize {
- return Err(IggyError::InvalidCommand);
- }
+ let username_length = *bytes.first().ok_or(IggyError::InvalidCommand)?
as usize;
+ let username = from_utf8(
+ bytes
+ .get(1..1 + username_length)
+ .ok_or(IggyError::InvalidCommand)?,
+ )
+ .map_err(|_| IggyError::InvalidUtf8)?
+ .to_string();
- let password_length = bytes[1 + username_length as usize];
+ let pos = 1 + username_length;
+ let password_length =
*bytes.get(pos).ok_or(IggyError::InvalidCommand)? as usize;
let password = from_utf8(
- &bytes[2 + username_length as usize
- ..2 + username_length as usize + password_length as usize],
+ bytes
+ .get(pos + 1..pos + 1 + password_length)
+ .ok_or(IggyError::InvalidCommand)?,
)
.map_err(|_| IggyError::InvalidUtf8)?
.to_string();
- if password.len() != password_length as usize {
- return Err(IggyError::InvalidCommand);
- }
- let position = 2 + username_length as usize + password_length as usize;
- let version_length = u32::from_le_bytes(
- bytes[position..position + 4]
- .try_into()
- .map_err(|_| IggyError::InvalidNumberEncoding)?,
- );
- let version = match version_length {
- 0 => None,
- _ => {
- let version =
- from_utf8(&bytes[position + 4..position + 4 +
version_length as usize])
- .map_err(|_| IggyError::InvalidUtf8)?
- .to_string();
- Some(version)
+ let mut position = pos + 1 + password_length;
+
+ // Version and context fields are optional for backward compatibility
+ // with older SDKs (e.g. v0.8.0) that don't send them.
+ // However, 1-3 trailing bytes (incomplete u32 length prefix) are
rejected
+ // as they indicate a corrupt payload rather than a valid old-SDK
format.
+ let remaining = bytes.len() - position;
+ let version = if remaining == 0 {
+ None
+ } else if remaining < 4 {
+ return Err(IggyError::InvalidCommand);
+ } else {
+ let version_length = u32::from_le_bytes(
+ bytes
+ .get(position..position + 4)
+ .ok_or(IggyError::InvalidCommand)?
+ .try_into()
+ .map_err(|_| IggyError::InvalidNumberEncoding)?,
+ );
+ position += 4;
+ match version_length {
+ 0 => None,
+ _ => {
+ let version = from_utf8(
+ bytes
+ .get(position..position + version_length as usize)
+ .ok_or(IggyError::InvalidCommand)?,
+ )
+ .map_err(|_| IggyError::InvalidUtf8)?
+ .to_string();
+ position += version_length as usize;
+ Some(version)
+ }
}
};
- let position = position + 4 + version_length as usize;
- let context_length = u32::from_le_bytes(
- bytes[position..position + 4]
- .try_into()
- .map_err(|_| IggyError::InvalidNumberEncoding)?,
- );
- let context = match context_length {
- 0 => None,
- _ => {
- let context =
- from_utf8(&bytes[position + 4..position + 4 +
context_length as usize])
- .map_err(|_| IggyError::InvalidUtf8)?
- .to_string();
- Some(context)
+
+ let remaining = bytes.len() - position;
+ let context = if remaining == 0 {
+ None
+ } else if remaining < 4 {
+ return Err(IggyError::InvalidCommand);
+ } else {
+ let context_length = u32::from_le_bytes(
+ bytes
+ .get(position..position + 4)
+ .ok_or(IggyError::InvalidCommand)?
+ .try_into()
+ .map_err(|_| IggyError::InvalidNumberEncoding)?,
+ );
+ position += 4;
+ match context_length {
+ 0 => None,
+ _ => {
+ let context = from_utf8(
+ bytes
+ .get(position..position + context_length as usize)
+ .ok_or(IggyError::InvalidCommand)?,
+ )
+ .map_err(|_| IggyError::InvalidUtf8)?
+ .to_string();
+ Some(context)
+ }
}
};
@@ -226,6 +258,85 @@ mod tests {
assert_eq!(context, command.context);
}
+ #[test]
+ fn from_bytes_should_fail_on_empty_input() {
+ assert!(LoginUser::from_bytes(Bytes::new()).is_err());
+ }
+
+ #[test]
+ fn from_bytes_should_fail_on_truncated_input() {
+ let command = LoginUser {
+ username: "user".to_string(),
+ password: "secret".to_string(),
+ version: Some("1.0.0".to_string()),
+ context: Some("test".to_string()),
+ };
+ let bytes = command.to_bytes();
+ // Truncate at every position up to (but not including) the version
field.
+ // Positions within username/password must error; positions at or past
the
+ // version boundary are valid old-SDK payloads.
+ let version_offset = 2 + command.username.len() +
command.password.len();
+ for i in 0..version_offset {
+ let truncated = bytes.slice(..i);
+ assert!(
+ LoginUser::from_bytes(truncated).is_err(),
+ "expected error for truncation at byte {i}"
+ );
+ }
+ }
+
+ #[test]
+ fn from_bytes_should_fail_on_corrupted_username_length() {
+ let mut buf = BytesMut::new();
+ buf.put_u8(255); // username_length = 255
+ buf.put_slice(b"short");
+ assert!(LoginUser::from_bytes(buf.freeze()).is_err());
+ }
+
+ #[test]
+ fn from_bytes_should_fail_on_trailing_bytes() {
+ let username = "user";
+ let password = "secret";
+ let mut bytes = BytesMut::new();
+ #[allow(clippy::cast_possible_truncation)]
+ bytes.put_u8(username.len() as u8);
+ bytes.put_slice(username.as_bytes());
+ #[allow(clippy::cast_possible_truncation)]
+ bytes.put_u8(password.len() as u8);
+ bytes.put_slice(password.as_bytes());
+
+ // 1-3 trailing bytes (incomplete u32 length prefix) must be rejected
+ for extra in 1..=3u8 {
+ let mut buf = bytes.clone();
+ for i in 0..extra {
+ buf.put_u8(i);
+ }
+ assert!(
+ LoginUser::from_bytes(buf.freeze()).is_err(),
+ "expected error for {extra} trailing byte(s)"
+ );
+ }
+ }
+
+ #[test]
+ fn from_bytes_should_accept_old_sdk_format_without_version_context() {
+ let username = "user";
+ let password = "secret";
+ let mut bytes = BytesMut::new();
+ #[allow(clippy::cast_possible_truncation)]
+ bytes.put_u8(username.len() as u8);
+ bytes.put_slice(username.as_bytes());
+ #[allow(clippy::cast_possible_truncation)]
+ bytes.put_u8(password.len() as u8);
+ bytes.put_slice(password.as_bytes());
+
+ let command = LoginUser::from_bytes(bytes.freeze()).unwrap();
+ assert_eq!(command.username, username);
+ assert_eq!(command.password, password);
+ assert_eq!(command.version, None);
+ assert_eq!(command.context, None);
+ }
+
#[test]
fn should_be_deserialized_from_bytes() {
let username = "user";
diff --git a/core/common/src/commands/users/update_permissions.rs
b/core/common/src/commands/users/update_permissions.rs
index ff96927dd..00510230a 100644
--- a/core/common/src/commands/users/update_permissions.rs
+++ b/core/common/src/commands/users/update_permissions.rs
@@ -75,19 +75,28 @@ impl BytesSerializable for UpdatePermissions {
let user_id = Identifier::from_bytes(bytes.clone())?;
let mut position = user_id.get_size_bytes().as_bytes_usize();
- let has_permissions = bytes[position];
+ let has_permissions =
*bytes.get(position).ok_or(IggyError::InvalidCommand)?;
if has_permissions > 1 {
return Err(IggyError::InvalidCommand);
}
position += 1;
let permissions = if has_permissions == 1 {
- let permissions_length =
- u32::from_le_bytes(bytes[position..position +
4].try_into().unwrap());
+ let permissions_length = u32::from_le_bytes(
+ bytes
+ .get(position..position + 4)
+ .ok_or(IggyError::InvalidCommand)?
+ .try_into()
+ .map_err(|_| IggyError::InvalidNumberEncoding)?,
+ );
position += 4;
- let permissions = Permissions::from_bytes(
- bytes.slice(position..position + permissions_length as usize),
- )?;
+ let end = position
+ .checked_add(permissions_length as usize)
+ .ok_or(IggyError::InvalidCommand)?;
+ if end > bytes.len() {
+ return Err(IggyError::InvalidCommand);
+ }
+ let permissions =
Permissions::from_bytes(bytes.slice(position..end))?;
Some(permissions)
} else {
None
diff --git a/core/common/src/commands/users/update_user.rs
b/core/common/src/commands/users/update_user.rs
index 3e2811915..b6df319a7 100644
--- a/core/common/src/commands/users/update_user.rs
+++ b/core/common/src/commands/users/update_user.rs
@@ -96,32 +96,37 @@ impl BytesSerializable for UpdateUser {
let user_id = Identifier::from_bytes(bytes.clone())?;
let mut position = user_id.get_size_bytes().as_bytes_usize();
- let has_username = bytes[position];
+ let has_username =
*bytes.get(position).ok_or(IggyError::InvalidCommand)?;
if has_username > 1 {
return Err(IggyError::InvalidCommand);
}
position += 1;
let username = if has_username == 1 {
- let username_length = bytes[position];
+ let username_length =
*bytes.get(position).ok_or(IggyError::InvalidCommand)?;
position += 1;
- let username = from_utf8(&bytes[position..position +
username_length as usize])
- .map_err(|_| IggyError::InvalidUtf8)?
- .to_string();
+ let username = from_utf8(
+ bytes
+ .get(position..position + username_length as usize)
+ .ok_or(IggyError::InvalidCommand)?,
+ )
+ .map_err(|_| IggyError::InvalidUtf8)?
+ .to_string();
position += username_length as usize;
Some(username)
} else {
None
};
- let has_status = bytes[position];
+ let has_status =
*bytes.get(position).ok_or(IggyError::InvalidCommand)?;
if has_status > 1 {
return Err(IggyError::InvalidCommand);
}
let status = if has_status == 1 {
position += 1;
- let status = UserStatus::from_code(bytes[position])?;
+ let status =
+
UserStatus::from_code(*bytes.get(position).ok_or(IggyError::InvalidCommand)?)?;
Some(status)
} else {
None
@@ -180,6 +185,28 @@ mod tests {
assert_eq!(status, command.status.unwrap());
}
+ #[test]
+ fn from_bytes_should_fail_on_empty_input() {
+ assert!(UpdateUser::from_bytes(Bytes::new()).is_err());
+ }
+
+ #[test]
+ fn from_bytes_should_fail_on_truncated_input() {
+ let command = UpdateUser {
+ user_id: Identifier::numeric(1).unwrap(),
+ username: Some("user".to_string()),
+ status: Some(UserStatus::Active),
+ };
+ let bytes = command.to_bytes();
+ for i in 0..bytes.len() - 1 {
+ let truncated = bytes.slice(..i);
+ assert!(
+ UpdateUser::from_bytes(truncated).is_err(),
+ "expected error for truncation at byte {i}"
+ );
+ }
+ }
+
#[test]
fn should_be_deserialized_from_bytes() {
let user_id = Identifier::numeric(1).unwrap();
diff --git a/core/common/src/types/identifier/mod.rs
b/core/common/src/types/identifier/mod.rs
index bd4a5ccc0..82c41eb8a 100644
--- a/core/common/src/types/identifier/mod.rs
+++ b/core/common/src/types/identifier/mod.rs
@@ -187,12 +187,12 @@ impl Identifier {
/// Creates identifier from raw bytes
pub fn from_raw_bytes(bytes: &[u8]) -> Result<Self, IggyError> {
- let kind = IdKind::from_code(bytes[0])?;
- let length = bytes[1];
- let value = bytes[2..2 + length as usize].to_vec();
- if value.len() != length as usize {
- return Err(IggyError::InvalidIdentifier);
- }
+ let kind =
IdKind::from_code(*bytes.first().ok_or(IggyError::InvalidIdentifier)?)?;
+ let length = *bytes.get(1).ok_or(IggyError::InvalidIdentifier)?;
+ let value = bytes
+ .get(2..2 + length as usize)
+ .ok_or(IggyError::InvalidIdentifier)?
+ .to_vec();
let identifier = Identifier {
kind,
@@ -228,16 +228,12 @@ impl BytesSerializable for Identifier {
where
Self: Sized,
{
- if bytes.len() < 3 {
- return Err(IggyError::InvalidIdentifier);
- }
-
- let kind = IdKind::from_code(bytes[0])?;
- let length = bytes[1];
- let value = bytes[2..2 + length as usize].to_vec();
- if value.len() != length as usize {
- return Err(IggyError::InvalidIdentifier);
- }
+ let kind =
IdKind::from_code(*bytes.first().ok_or(IggyError::InvalidIdentifier)?)?;
+ let length = *bytes.get(1).ok_or(IggyError::InvalidIdentifier)?;
+ let value = bytes
+ .get(2..2 + length as usize)
+ .ok_or(IggyError::InvalidIdentifier)?
+ .to_vec();
let identifier = Identifier {
kind,
@@ -382,6 +378,55 @@ mod tests {
assert!(Identifier::named(&"a".repeat(256)).is_err());
}
+ #[test]
+ fn from_bytes_should_fail_on_empty_input() {
+ assert!(Identifier::from_bytes(Bytes::new()).is_err());
+ }
+
+ #[test]
+ fn from_bytes_should_fail_on_truncated_input() {
+ let id = Identifier::numeric(42).unwrap();
+ let bytes = id.to_bytes();
+ for i in 0..bytes.len() - 1 {
+ let truncated = bytes.slice(..i);
+ assert!(
+ Identifier::from_bytes(truncated).is_err(),
+ "expected error for truncation at byte {i}"
+ );
+ }
+ }
+
+ #[test]
+ fn from_bytes_should_fail_on_corrupted_length() {
+ let mut buf = BytesMut::new();
+ buf.put_u8(1); // Numeric kind
+ buf.put_u8(255); // length = 255 but only 2 bytes of value follow
+ buf.put_u16_le(0);
+ assert!(Identifier::from_bytes(buf.freeze()).is_err());
+ }
+
+ #[test]
+ fn from_raw_bytes_should_fail_on_empty_input() {
+ assert!(Identifier::from_raw_bytes(&[]).is_err());
+ }
+
+ #[test]
+ fn from_raw_bytes_should_fail_on_truncated_input() {
+ let id = Identifier::numeric(42).unwrap();
+ let bytes = id.to_bytes();
+ for i in 0..bytes.len() - 1 {
+ assert!(
+ Identifier::from_raw_bytes(&bytes[..i]).is_err(),
+ "expected error for truncation at byte {i}"
+ );
+ }
+ }
+
+ #[test]
+ fn from_raw_bytes_should_fail_on_corrupted_length() {
+ assert!(Identifier::from_raw_bytes(&[1, 255, 0, 0]).is_err());
+ }
+
#[test]
fn numeric_id_should_be_converted_into_identifier_using_trait() {
let id = 1;
diff --git a/core/common/src/types/message/message_view.rs
b/core/common/src/types/message/message_view.rs
index 1f1442ea7..dd3fb6b1a 100644
--- a/core/common/src/types/message/message_view.rs
+++ b/core/common/src/types/message/message_view.rs
@@ -37,17 +37,34 @@ pub struct IggyMessageView<'a> {
impl<'a> IggyMessageView<'a> {
/// Creates a new immutable message view from a buffer.
- pub fn new(buffer: &'a [u8]) -> Self {
+ ///
+ /// Validates that the buffer is large enough to contain the full message
+ /// (header + payload + user headers). All subsequent accessors can use
+ /// direct indexing because this constructor guarantees the bounds.
+ pub fn new(buffer: &'a [u8]) -> Result<Self, IggyError> {
+ if buffer.len() < IGGY_MESSAGE_HEADER_SIZE {
+ return Err(IggyError::InvalidMessagePayloadLength);
+ }
let header_view =
IggyMessageHeaderView::new(&buffer[IGGY_MESSAGE_HEADER_RANGE]);
let payload_len = header_view.payload_length();
+ let user_headers_len = header_view.user_headers_length();
+ let total_size = IGGY_MESSAGE_HEADER_SIZE
+ .checked_add(payload_len)
+ .and_then(|s| s.checked_add(user_headers_len))
+ .ok_or(IggyError::InvalidMessagePayloadLength)?;
+ if buffer.len() < total_size {
+ return Err(IggyError::InvalidMessagePayloadLength);
+ }
let payload_offset = IGGY_MESSAGE_HEADER_SIZE;
- let headers_offset = payload_offset + payload_len;
+ let headers_offset = payload_offset
+ .checked_add(payload_len)
+ .ok_or(IggyError::InvalidMessagePayloadLength)?;
- Self {
+ Ok(Self {
buffer,
payload_offset,
user_headers_offset: headers_offset,
- }
+ })
}
/// Returns an immutable header view.
@@ -57,23 +74,13 @@ impl<'a> IggyMessageView<'a> {
/// Returns an immutable slice of the user headers.
pub fn user_headers(&self) -> Option<&[u8]> {
- if self.header().user_headers_length() > 0 {
- let header_length = self.header().user_headers_length();
- let end_offset = self.user_headers_offset + header_length;
-
- if end_offset <= self.buffer.len() {
- Some(&self.buffer[self.user_headers_offset..end_offset])
- } else {
- tracing::error!(
- "Header length in message exceeds buffer bounds:
length={}, buffer_remaining={}",
- header_length,
- self.buffer.len() - self.user_headers_offset
- );
- None
- }
- } else {
- None
+ let header_length = self.header().user_headers_length();
+ if header_length == 0 {
+ return None;
}
+ // Bounds guaranteed by new() which validates total message size
+ let end_offset = self.user_headers_offset + header_length;
+ Some(&self.buffer[self.user_headers_offset..end_offset])
}
/// Return instantiated user headers map
@@ -108,32 +115,19 @@ impl<'a> IggyMessageView<'a> {
pub fn payload(&self) -> &[u8] {
let header_view = self.header();
let payload_len = header_view.payload_length();
- &self.buffer[self.payload_offset..self.payload_offset + payload_len]
- }
-
- /// Validates that the message view is properly formatted and has valid
data.
- pub fn validate(&self) -> Result<(), IggyError> {
- if self.buffer.len() < IGGY_MESSAGE_HEADER_SIZE {
- return Err(IggyError::InvalidMessagePayloadLength);
- }
-
- let header = self.header();
- let payload_len = header.payload_length();
- let user_headers_len = header.user_headers_length();
- let total_size = IGGY_MESSAGE_HEADER_SIZE + payload_len +
user_headers_len;
-
- if self.buffer.len() < total_size {
- return Err(IggyError::InvalidMessagePayloadLength);
- }
- Ok(())
+ let end = self.payload_offset + payload_len;
+ // Bounds guaranteed by new() which validates total message size
+ &self.buffer[self.payload_offset..end]
}
- /// Validates that the message view has a valid checksum.
+ /// Calculates the checksum over the message (excluding the checksum field
itself).
/// This should be called only on server side.
pub fn calculate_checksum(&self) -> u64 {
- let checksum_field_size = size_of::<u64>(); // Skip checksum field for
checksum calculation
+ let checksum_field_size = size_of::<u64>();
let size = self.size() - checksum_field_size;
- let data = &self.buffer[checksum_field_size..checksum_field_size +
size];
+ let end = checksum_field_size + size;
+ // Bounds guaranteed by new() which validates total message size
+ let data = &self.buffer[checksum_field_size..end];
checksum::calculate_checksum(data)
}
}
@@ -186,7 +180,7 @@ impl<'a> Iterator for IggyMessageViewIterator<'a> {
}
let remaining = &self.buffer[self.position..];
- let view = IggyMessageView::new(remaining);
+ let view = IggyMessageView::new(remaining).ok()?;
self.position += view.size();
Some(view)
}
diff --git a/core/common/src/types/message/messages_batch.rs
b/core/common/src/types/message/messages_batch.rs
index 6bc704866..aa6910b01 100644
--- a/core/common/src/types/message/messages_batch.rs
+++ b/core/common/src/types/message/messages_batch.rs
@@ -166,11 +166,8 @@ impl IggyMessagesBatch {
/// Get the message at the specified index.
/// Returns None if the index is out of bounds.
pub fn get(&self, index: usize) -> Option<IggyMessageView<'_>> {
- if let Some((start, end)) = self.get_message_boundaries(index) {
- Some(IggyMessageView::new(&self.messages[start..end]))
- } else {
- None
- }
+ let (start, end) = self.get_message_boundaries(index)?;
+ IggyMessageView::new(&self.messages[start..end]).ok()
}
}
diff --git a/core/common/src/types/message/messages_batch_mut.rs
b/core/common/src/types/message/messages_batch_mut.rs
index 3857f2667..55e33bf27 100644
--- a/core/common/src/types/message/messages_batch_mut.rs
+++ b/core/common/src/types/message/messages_batch_mut.rs
@@ -199,7 +199,7 @@ impl IggyMessagesBatchMut {
if self.is_empty() {
return None;
}
- Some(IggyMessageView::new(&self.messages).header().offset())
+ Some(IggyMessageView::new(&self.messages).ok()?.header().offset())
}
/// Returns the first timestamp in the batch
@@ -207,7 +207,12 @@ impl IggyMessagesBatchMut {
if self.is_empty() {
return None;
}
- Some(IggyMessageView::new(&self.messages).header().timestamp())
+ Some(
+ IggyMessageView::new(&self.messages)
+ .ok()?
+ .header()
+ .timestamp(),
+ )
}
/// Returns the last timestamp in the batch
@@ -217,11 +222,13 @@ impl IggyMessagesBatchMut {
}
let last_index = self.count() as usize - 1;
- self.get_message_boundaries(last_index).map(|(start, _)| {
- IggyMessageView::new(&self.messages[start..])
+ let (start, end) = self.get_message_boundaries(last_index)?;
+ Some(
+ IggyMessageView::new(&self.messages[start..end])
+ .ok()?
.header()
- .timestamp()
- })
+ .timestamp(),
+ )
}
/// Returns the last offset in the batch
@@ -230,11 +237,13 @@ impl IggyMessagesBatchMut {
return None;
}
let last_index = self.count() as usize - 1;
- self.get_message_boundaries(last_index).map(|(start, _)| {
- IggyMessageView::new(&self.messages[start..])
+ let (start, end) = self.get_message_boundaries(last_index)?;
+ Some(
+ IggyMessageView::new(&self.messages[start..end])
+ .ok()?
.header()
- .offset()
- })
+ .offset(),
+ )
}
/// Checks if the batch is empty.
@@ -472,8 +481,8 @@ impl IggyMessagesBatchMut {
/// Get the message at the specified index.
/// Returns None if the index is out of bounds or the message cannot be
found.
pub fn get(&self, index: usize) -> Option<IggyMessageView<'_>> {
- self.get_message_boundaries(index)
- .map(|(start, end)|
IggyMessageView::new(&self.messages[start..end]))
+ let (start, end) = self.get_message_boundaries(index)?;
+ IggyMessageView::new(&self.messages[start..end]).ok()
}
/// This helper function is used to parse newly appended chunks in the
`new_buffer`.
@@ -494,7 +503,10 @@ impl IggyMessagesBatchMut {
let mut current = chunk_start;
while current < chunk_end {
- let view = IggyMessageView::new(&new_buffer[current..]);
+ let Ok(view) = IggyMessageView::new(&new_buffer[current..]) else {
+ error!("Corrupt message in already-validated chunk at offset
{current}");
+ break;
+ };
let msg_size = view.size();
*offset_in_new_buffer += msg_size as u32;
new_indexes.insert(0, *offset_in_new_buffer, 0);
diff --git a/core/common/src/types/message/partitioning.rs
b/core/common/src/types/message/partitioning.rs
index 78efab2ce..f0b2af3b0 100644
--- a/core/common/src/types/message/partitioning.rs
+++ b/core/common/src/types/message/partitioning.rs
@@ -49,12 +49,13 @@ impl Display for Partitioning {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self.kind {
PartitioningKind::Balanced => write!(f, "{}|0", self.kind),
- PartitioningKind::PartitionId => write!(
- f,
- "{}|{}",
- self.kind,
- u32::from_le_bytes(self.value[..4].try_into().unwrap())
- ),
+ PartitioningKind::PartitionId => {
+ if let Some(bytes) = self.value.get(..4).and_then(|s|
s.try_into().ok()) {
+ write!(f, "{}|{}", self.kind, u32::from_le_bytes(bytes))
+ } else {
+ write!(f, "{}|<invalid>", self.kind)
+ }
+ }
PartitioningKind::MessagesKey => {
write!(f, "{}|{}", self.kind,
String::from_utf8_lossy(&self.value))
}
@@ -137,15 +138,22 @@ impl Partitioning {
}
}
- /// Create the partitioning from BytesMut.
+ /// Create the partitioning from a raw byte slice.
pub fn from_raw_bytes(bytes: &[u8]) -> Result<Self, IggyError> {
- let kind = PartitioningKind::from_code(bytes[0])?;
- let length = bytes[1];
- let value = bytes[2..2 + length as usize].to_vec();
- if value.len() != length as usize {
+ let kind =
PartitioningKind::from_code(*bytes.first().ok_or(IggyError::InvalidCommand)?)?;
+ let length = *bytes.get(1).ok_or(IggyError::InvalidCommand)?;
+ if kind == PartitioningKind::PartitionId && length != 4 {
+ return Err(IggyError::InvalidCommand);
+ }
+ if kind == PartitioningKind::Balanced && length != 0 {
return Err(IggyError::InvalidCommand);
}
+ let value = bytes
+ .get(2..2 + length as usize)
+ .ok_or(IggyError::InvalidCommand)?
+ .to_vec();
+
Ok(Partitioning {
kind,
length,
@@ -192,22 +200,7 @@ impl BytesSerializable for Partitioning {
where
Self: Sized,
{
- if bytes.len() < 2 {
- return Err(IggyError::InvalidCommand);
- }
-
- let kind = PartitioningKind::from_code(bytes[0])?;
- let length = bytes[1];
- let value = bytes[2..2 + length as usize].to_vec();
- if value.len() != length as usize {
- return Err(IggyError::InvalidCommand);
- }
-
- Ok(Partitioning {
- kind,
- length,
- value,
- })
+ Self::from_raw_bytes(&bytes)
}
fn write_to_buffer(&self, bytes: &mut BytesMut) {
@@ -220,3 +213,103 @@ impl BytesSerializable for Partitioning {
2 + self.length as usize
}
}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn from_raw_bytes_should_reject_partition_id_with_wrong_length() {
+ for bad_len in [0u8, 1, 2, 3, 5, 255] {
+ let mut buf = vec![PartitioningKind::PartitionId.as_code(),
bad_len];
+ buf.extend(vec![0u8; bad_len as usize]);
+ assert!(
+ Partitioning::from_raw_bytes(&buf).is_err(),
+ "expected error for PartitionId with length={bad_len}"
+ );
+ }
+ }
+
+ #[test]
+ fn from_raw_bytes_should_accept_valid_partition_id() {
+ let mut buf = vec![PartitioningKind::PartitionId.as_code(), 4];
+ buf.extend(42u32.to_le_bytes());
+ let p = Partitioning::from_raw_bytes(&buf).unwrap();
+ assert_eq!(p.kind, PartitioningKind::PartitionId);
+ assert_eq!(p.length, 4);
+ assert_eq!(p.value, 42u32.to_le_bytes());
+ }
+
+ #[test]
+ fn from_bytes_should_reject_partition_id_with_wrong_length() {
+ for bad_len in [0u8, 1, 2, 3, 5] {
+ let mut buf = BytesMut::new();
+ buf.put_u8(PartitioningKind::PartitionId.as_code());
+ buf.put_u8(bad_len);
+ buf.extend(vec![0u8; bad_len as usize]);
+ assert!(
+ Partitioning::from_bytes(buf.freeze()).is_err(),
+ "expected error for PartitionId with length={bad_len}"
+ );
+ }
+ }
+
+ #[test]
+ fn from_raw_bytes_should_fail_on_truncated_input() {
+ let p = Partitioning::partition_id(99);
+ let full = p.to_bytes();
+ for i in 0..full.len() {
+ assert!(
+ Partitioning::from_raw_bytes(&full[..i]).is_err(),
+ "expected error for truncation at byte {i}"
+ );
+ }
+ }
+
+ #[test]
+ fn display_should_not_panic_on_malformed_partition_id() {
+ let malformed = Partitioning {
+ kind: PartitioningKind::PartitionId,
+ length: 0,
+ value: vec![],
+ };
+ let s = format!("{malformed}");
+ assert!(s.contains("<invalid>"));
+ }
+
+ #[test]
+ fn round_trip_partition_id() {
+ let original = Partitioning::partition_id(12345);
+ let bytes = original.to_bytes();
+ let restored = Partitioning::from_bytes(bytes).unwrap();
+ assert_eq!(original, restored);
+ }
+
+ #[test]
+ fn from_raw_bytes_should_reject_balanced_with_nonzero_length() {
+ for bad_len in [1u8, 2, 4, 255] {
+ let mut buf = vec![PartitioningKind::Balanced.as_code(), bad_len];
+ buf.extend(vec![0u8; bad_len as usize]);
+ assert!(
+ Partitioning::from_raw_bytes(&buf).is_err(),
+ "expected error for Balanced with length={bad_len}"
+ );
+ }
+ }
+
+ #[test]
+ fn round_trip_balanced() {
+ let original = Partitioning::balanced();
+ let bytes = original.to_bytes();
+ let restored = Partitioning::from_bytes(bytes).unwrap();
+ assert_eq!(original, restored);
+ }
+
+ #[test]
+ fn round_trip_messages_key() {
+ let original = Partitioning::messages_key(b"my-key").unwrap();
+ let bytes = original.to_bytes();
+ let restored = Partitioning::from_bytes(bytes).unwrap();
+ assert_eq!(original, restored);
+ }
+}
diff --git a/core/server/src/binary/handlers/messages/send_messages_handler.rs
b/core/server/src/binary/handlers/messages/send_messages_handler.rs
index 713d64ab2..7bac42805 100644
--- a/core/server/src/binary/handlers/messages/send_messages_handler.rs
+++ b/core/server/src/binary/handlers/messages/send_messages_handler.rs
@@ -54,14 +54,20 @@ impl ServerCommandHandler for SendMessages {
shard: &Rc<IggyShard>,
) -> Result<HandlerResult, IggyError> {
shard.ensure_authenticated(session)?;
- let total_payload_size = length as usize - std::mem::size_of::<u32>();
+ let total_payload_size = (length as usize)
+ .checked_sub(std::mem::size_of::<u32>())
+ .ok_or(IggyError::InvalidCommand)?;
let metadata_len_field_size = std::mem::size_of::<u32>();
let metadata_length_buffer = PooledBuffer::with_capacity(4);
let (result, metadata_len_buf) =
sender.read(metadata_length_buffer.slice(0..4)).await;
let metadata_len_buf = metadata_len_buf.into_inner();
result?;
- let metadata_size =
u32::from_le_bytes(metadata_len_buf[..].try_into().unwrap());
+ let metadata_size = u32::from_le_bytes(
+ metadata_len_buf[..]
+ .try_into()
+ .map_err(|_| IggyError::InvalidNumberEncoding)?,
+ );
let metadata_buffer = PooledBuffer::with_capacity(metadata_size as
usize);
let (result, metadata_buf) = sender
@@ -76,28 +82,46 @@ impl ServerCommandHandler for SendMessages {
element_size += stream_id.get_size_bytes().as_bytes_usize();
self.stream_id = stream_id;
- let topic_id =
Identifier::from_raw_bytes(&metadata_buf[element_size..])?;
+ let topic_id = Identifier::from_raw_bytes(
+ metadata_buf
+ .get(element_size..)
+ .ok_or(IggyError::InvalidCommand)?,
+ )?;
element_size += topic_id.get_size_bytes().as_bytes_usize();
self.topic_id = topic_id;
- let partitioning =
Partitioning::from_raw_bytes(&metadata_buf[element_size..])?;
+ let partitioning = Partitioning::from_raw_bytes(
+ metadata_buf
+ .get(element_size..)
+ .ok_or(IggyError::InvalidCommand)?,
+ )?;
element_size += partitioning.get_size_bytes().as_bytes_usize();
self.partitioning = partitioning;
let messages_count = u32::from_le_bytes(
- metadata_buf[element_size..element_size + 4]
+ metadata_buf
+ .get(element_size..element_size + 4)
+ .ok_or(IggyError::InvalidCommand)?
.try_into()
- .unwrap(),
+ .map_err(|_| IggyError::InvalidNumberEncoding)?,
);
- let indexes_size = messages_count as usize * INDEX_SIZE;
+ let indexes_size = (messages_count as usize)
+ .checked_mul(INDEX_SIZE)
+ .ok_or(IggyError::InvalidCommand)?;
+ if indexes_size > total_payload_size {
+ return Err(IggyError::InvalidCommand);
+ }
let indexes_buffer = PooledBuffer::with_capacity(indexes_size);
let (result, indexes_buffer) =
sender.read(indexes_buffer.slice(0..indexes_size)).await;
result?;
let indexes_buffer = indexes_buffer.into_inner();
- let messages_size =
- total_payload_size - metadata_size as usize - indexes_size -
metadata_len_field_size;
+ let messages_size = total_payload_size
+ .checked_sub(metadata_size as usize)
+ .and_then(|s| s.checked_sub(indexes_size))
+ .and_then(|s| s.checked_sub(metadata_len_field_size))
+ .ok_or(IggyError::InvalidCommand)?;
let messages_buffer = PooledBuffer::with_capacity(messages_size);
let (result, messages_buffer) =
sender.read(messages_buffer.slice(0..messages_size)).await;
result?;
@@ -122,7 +146,10 @@ impl ServerCommandHandler for SendMessages {
self.topic_id.clone(),
))?,
PartitioningKind::PartitionId => u32::from_le_bytes(
- self.partitioning.value[..self.partitioning.length as usize]
+ self.partitioning
+ .value
+ .get(..4)
+ .ok_or(IggyError::InvalidCommand)?
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
) as usize,
diff --git a/core/server/src/http/http_shard_wrapper.rs
b/core/server/src/http/http_shard_wrapper.rs
index c89b906b5..9952e4de7 100644
--- a/core/server/src/http/http_shard_wrapper.rs
+++ b/core/server/src/http/http_shard_wrapper.rs
@@ -179,7 +179,10 @@ impl HttpSafeShard {
.get_next_partition_id(topic.stream_id, topic.topic_id)
.ok_or(IggyError::TopicIdNotFound(stream_id, topic_id))?,
PartitioningKind::PartitionId => u32::from_le_bytes(
- partitioning.value[..partitioning.length as usize]
+ partitioning
+ .value
+ .get(..4)
+ .ok_or(IggyError::InvalidCommand)?
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
) as usize,