This is an automated email from the ASF dual-hosted git repository.
hubcio pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new 645a4e9d4 fix(go): add bounds checking to DeserializeStreams for
payloads > 64KB (#3165)
645a4e9d4 is described below
commit 645a4e9d4143d43bd2ddca023b50172eb806ef1a
Author: Atharva Lade <[email protected]>
AuthorDate: Mon May 18 16:08:41 2026 -0500
fix(go): add bounds checking to DeserializeStreams for payloads > 64KB
(#3165)
---
.../binary_response_deserializer.go | 39 +++--
.../binary_response_deserializer_test.go | 160 +++++++++++++++++++++
foreign/go/client/tcp/tcp_stream_management.go | 2 +-
3 files changed, 189 insertions(+), 12 deletions(-)
diff --git a/foreign/go/binary_serialization/binary_response_deserializer.go
b/foreign/go/binary_serialization/binary_response_deserializer.go
index 3de147010..fa2bb938b 100644
--- a/foreign/go/binary_serialization/binary_response_deserializer.go
+++ b/foreign/go/binary_serialization/binary_response_deserializer.go
@@ -53,7 +53,10 @@ func DeserializeOffset(payload []byte)
*iggcon.ConsumerOffsetInfo {
}
func DeserializeStream(payload []byte) (*iggcon.StreamDetails, error) {
- stream, pos := DeserializeToStream(payload, 0)
+ stream, pos, err := DeserializeToStream(payload, 0)
+ if err != nil {
+ return nil, err
+ }
topics := make([]iggcon.Topic, 0)
for pos < len(payload) {
topic, readBytes, err := DeserializeToTopic(payload, pos)
@@ -74,22 +77,32 @@ func DeserializeStream(payload []byte)
(*iggcon.StreamDetails, error) {
}, nil
}
-func DeserializeStreams(payload []byte) []iggcon.Stream {
+func DeserializeStreams(payload []byte) ([]iggcon.Stream, error) {
streams := make([]iggcon.Stream, 0)
position := 0
- //TODO there's a deserialization bug, investigate this
- //it occurs only with payload greater than 2 pow 16
for position < len(payload) {
- stream, readBytes := DeserializeToStream(payload, position)
+ stream, readBytes, err := DeserializeToStream(payload, position)
+ if err != nil {
+ return nil, fmt.Errorf("failed to deserialize stream at
offset %d: %w", position, err)
+ }
streams = append(streams, stream)
position += readBytes
}
- return streams
+ return streams, nil
}
-func DeserializeToStream(payload []byte, position int) (iggcon.Stream, int) {
+const streamFixedSize = 4 + 8 + 4 + 8 + 8 + 1 // 33 bytes: id + created_at +
topics_count + size_bytes + messages_count + name_len
+
+func DeserializeToStream(payload []byte, position int) (iggcon.Stream, int,
error) {
+ remaining := len(payload) - position
+ if remaining < streamFixedSize {
+ return iggcon.Stream{}, 0, fmt.Errorf(
+ "not enough data to read stream header: need %d bytes,
got %d",
+ streamFixedSize, remaining)
+ }
+
id := binary.LittleEndian.Uint32(payload[position : position+4])
createdAt := binary.LittleEndian.Uint64(payload[position+4 :
position+12])
topicsCount := binary.LittleEndian.Uint32(payload[position+12 :
position+16])
@@ -97,10 +110,14 @@ func DeserializeToStream(payload []byte, position int)
(iggcon.Stream, int) {
messagesCount := binary.LittleEndian.Uint64(payload[position+24 :
position+32])
nameLength := int(payload[position+32])
- nameBytes := payload[position+33 : position+33+nameLength]
- name := string(nameBytes)
+ totalSize := streamFixedSize + nameLength
+ if remaining < totalSize {
+ return iggcon.Stream{}, 0, fmt.Errorf(
+ "not enough data to read stream name: need %d bytes,
got %d",
+ totalSize, remaining)
+ }
- readBytes := 4 + 8 + 4 + 8 + 8 + 1 + nameLength
+ name := string(payload[position+33 : position+33+nameLength])
return iggcon.Stream{
Id: id,
@@ -109,7 +126,7 @@ func DeserializeToStream(payload []byte, position int)
(iggcon.Stream, int) {
SizeBytes: sizeBytes,
MessagesCount: messagesCount,
CreatedAt: createdAt,
- }, readBytes
+ }, totalSize, nil
}
func DeserializeFetchMessagesResponse(payload []byte, compression
iggcon.IggyMessageCompression) (*iggcon.PolledMessage, error) {
diff --git
a/foreign/go/binary_serialization/binary_response_deserializer_test.go
b/foreign/go/binary_serialization/binary_response_deserializer_test.go
index 1b4c0c023..9165c9946 100644
--- a/foreign/go/binary_serialization/binary_response_deserializer_test.go
+++ b/foreign/go/binary_serialization/binary_response_deserializer_test.go
@@ -19,6 +19,7 @@ package binaryserialization
import (
"encoding/binary"
+ "fmt"
"strings"
"testing"
@@ -86,3 +87,162 @@ func TestDeserializeFetchMessages_EmptyPayload(t
*testing.T) {
t.Fatalf("expected 0 messages, got %d", len(result.Messages))
}
}
+
+func encodeStream(id uint32, createdAt uint64, topicsCount uint32, sizeBytes,
messagesCount uint64, name string) []byte {
+ nameBytes := []byte(name)
+ buf := make([]byte, streamFixedSize+len(nameBytes))
+ binary.LittleEndian.PutUint32(buf[0:4], id)
+ binary.LittleEndian.PutUint64(buf[4:12], createdAt)
+ binary.LittleEndian.PutUint32(buf[12:16], topicsCount)
+ binary.LittleEndian.PutUint64(buf[16:24], sizeBytes)
+ binary.LittleEndian.PutUint64(buf[24:32], messagesCount)
+ buf[32] = byte(len(nameBytes))
+ copy(buf[33:], nameBytes)
+ return buf
+}
+
+func assertStream(t *testing.T, label string, got iggcon.Stream, wantId
uint32, wantCreatedAt uint64, wantTopicsCount uint32, wantSizeBytes,
wantMessagesCount uint64, wantName string) {
+ t.Helper()
+ if got.Id != wantId {
+ t.Errorf("%s.Id = %d, want %d", label, got.Id, wantId)
+ }
+ if got.CreatedAt != wantCreatedAt {
+ t.Errorf("%s.CreatedAt = %d, want %d", label, got.CreatedAt,
wantCreatedAt)
+ }
+ if got.TopicsCount != wantTopicsCount {
+ t.Errorf("%s.TopicsCount = %d, want %d", label,
got.TopicsCount, wantTopicsCount)
+ }
+ if got.SizeBytes != wantSizeBytes {
+ t.Errorf("%s.SizeBytes = %d, want %d", label, got.SizeBytes,
wantSizeBytes)
+ }
+ if got.MessagesCount != wantMessagesCount {
+ t.Errorf("%s.MessagesCount = %d, want %d", label,
got.MessagesCount, wantMessagesCount)
+ }
+ if got.Name != wantName {
+ t.Errorf("%s.Name = %q, want %q", label, got.Name, wantName)
+ }
+}
+
+func TestDeserializeToStream_SingleStream(t *testing.T) {
+ payload := encodeStream(42, 1_710_000_000, 5, 2048, 100, "my-stream")
+
+ stream, readBytes, err := DeserializeToStream(payload, 0)
+ if err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+ if readBytes != len(payload) {
+ t.Fatalf("readBytes = %d, want %d", readBytes, len(payload))
+ }
+ assertStream(t, "stream", stream, 42, 1_710_000_000, 5, 2048, 100,
"my-stream")
+}
+
+func TestDeserializeToStream_TruncatedHeader(t *testing.T) {
+ payload := make([]byte, streamFixedSize-1)
+ _, _, err := DeserializeToStream(payload, 0)
+ if err == nil {
+ t.Fatal("expected error for truncated header, got nil")
+ }
+}
+
+func TestDeserializeToStream_TruncatedName(t *testing.T) {
+ buf := make([]byte, streamFixedSize)
+ buf[32] = 10
+ _, _, err := DeserializeToStream(buf, 0)
+ if err == nil {
+ t.Fatal("expected error for truncated name, got nil")
+ }
+}
+
+func TestDeserializeStreams_Empty(t *testing.T) {
+ streams, err := DeserializeStreams([]byte{})
+ if err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+ if len(streams) != 0 {
+ t.Fatalf("expected 0 streams, got %d", len(streams))
+ }
+}
+
+func TestDeserializeStreams_MultipleStreams(t *testing.T) {
+ var payload []byte
+ payload = append(payload, encodeStream(1, 100, 2, 512, 50,
"stream-one")...)
+ payload = append(payload, encodeStream(2, 200, 0, 0, 0, "s2")...)
+ payload = append(payload, encodeStream(3, 300, 1, 1024, 10, "third")...)
+
+ streams, err := DeserializeStreams(payload)
+ if err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+ if len(streams) != 3 {
+ t.Fatalf("expected 3 streams, got %d", len(streams))
+ }
+
+ assertStream(t, "stream[0]", streams[0], 1, 100, 2, 512, 50,
"stream-one")
+ assertStream(t, "stream[1]", streams[1], 2, 200, 0, 0, 0, "s2")
+ assertStream(t, "stream[2]", streams[2], 3, 300, 1, 1024, 10, "third")
+}
+
+func TestDeserializeStreams_CorruptedPayload(t *testing.T) {
+ good := encodeStream(1, 100, 0, 0, 0, "ok")
+ truncated := make([]byte, streamFixedSize-5)
+ payload := append(good, truncated...)
+
+ _, err := DeserializeStreams(payload)
+ if err == nil {
+ t.Fatal("expected error for corrupted payload, got nil")
+ }
+}
+
+// Regression test for issue #3130: payloads > 64KB produced corrupted
+// stream lists because no bounds checking was performed.
+func TestDeserializeStreams_LargePayloadOver64KB(t *testing.T) {
+ const targetSize = 70_000
+ var payload []byte
+ var id uint32
+
+ for len(payload) < targetSize {
+ id++
+ name := fmt.Sprintf("stream-with-a-longer-name-for-padding-%d",
id)
+ payload = append(payload, encodeStream(id, uint64(id)*1000,
id%10, uint64(id)*512, uint64(id)*5, name)...)
+ }
+
+ if len(payload) <= 1<<16 {
+ t.Fatalf("payload size %d is not > 64KB; increase stream count
or name length", len(payload))
+ }
+
+ streams, err := DeserializeStreams(payload)
+ if err != nil {
+ t.Fatalf("unexpected error deserializing %d-byte payload: %v",
len(payload), err)
+ }
+
+ if uint32(len(streams)) != id {
+ t.Fatalf("expected %d streams, got %d", id, len(streams))
+ }
+
+ for i, s := range streams {
+ expectedId := uint32(i + 1)
+ expectedName :=
fmt.Sprintf("stream-with-a-longer-name-for-padding-%d", expectedId)
+ assertStream(t, fmt.Sprintf("stream[%d]", i), s,
+ expectedId, uint64(expectedId)*1000, expectedId%10,
+ uint64(expectedId)*512, uint64(expectedId)*5,
expectedName)
+ }
+}
+
+func TestDeserializeStreams_MaxLengthName(t *testing.T) {
+ name := make([]byte, 255)
+ for i := range name {
+ name[i] = 'a' + byte(i%26)
+ }
+ payload := encodeStream(1, 999, 3, 4096, 200, string(name))
+
+ streams, err := DeserializeStreams(payload)
+ if err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+ if len(streams) != 1 {
+ t.Fatalf("expected 1 stream, got %d", len(streams))
+ }
+ if streams[0].Name != string(name) {
+ t.Errorf("Name length = %d, want 255", len(streams[0].Name))
+ }
+}
diff --git a/foreign/go/client/tcp/tcp_stream_management.go
b/foreign/go/client/tcp/tcp_stream_management.go
index 9fbba5b4c..932c6ac4e 100644
--- a/foreign/go/client/tcp/tcp_stream_management.go
+++ b/foreign/go/client/tcp/tcp_stream_management.go
@@ -30,7 +30,7 @@ func (c *IggyTcpClient) GetStreams() ([]iggcon.Stream, error)
{
return nil, err
}
- return binaryserialization.DeserializeStreams(buffer), nil
+ return binaryserialization.DeserializeStreams(buffer)
}
func (c *IggyTcpClient) GetStream(streamId iggcon.Identifier)
(*iggcon.StreamDetails, error) {