This is an automated email from the ASF dual-hosted git repository.
piotr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new 1c9f2715 refactor(foreign/go): replace the previous implementation of
IggyMessage to option pattern (#1904)
1c9f2715 is described below
commit 1c9f27154082afb1534a8b5c80e87ebb58770ea4
Author: Chengxi <[email protected]>
AuthorDate: Tue Jun 24 14:00:32 2025 -0400
refactor(foreign/go): replace the previous implementation of IggyMessage to
option pattern (#1904)
---
.../go/benchmarks/send_messages_benchmark_test.go | 6 +-
.../send_messages_request_serializer_test.go | 8 +--
foreign/go/contracts/message_header.go | 26 ++++----
foreign/go/contracts/messages.go | 77 ++++++++++++++++++----
foreign/go/e2e/tcp_test/messages_steps.go | 9 +--
foreign/go/errors/constants.go | 20 ++++--
foreign/go/samples/producer/producer.go | 2 +-
7 files changed, 105 insertions(+), 43 deletions(-)
diff --git a/foreign/go/benchmarks/send_messages_benchmark_test.go
b/foreign/go/benchmarks/send_messages_benchmark_test.go
index 74261c1a..a8946be5 100644
--- a/foreign/go/benchmarks/send_messages_benchmark_test.go
+++ b/foreign/go/benchmarks/send_messages_benchmark_test.go
@@ -148,7 +148,11 @@ func CreateMessages(messagesCount, messageSize int)
[]iggcon.IggyMessage {
}
id, _ := uuid.NewUUID()
- messages[i] = iggcon.NewIggyMessage(id, payload)
+ var err error
+ messages[i], err = iggcon.NewIggyMessage(payload,
iggcon.WithID(id))
+ if err != nil {
+ panic(err)
+ }
}
return messages
}
diff --git
a/foreign/go/binary_serialization/send_messages_request_serializer_test.go
b/foreign/go/binary_serialization/send_messages_request_serializer_test.go
index f28ff19d..c28c7c6f 100644
--- a/foreign/go/binary_serialization/send_messages_request_serializer_test.go
+++ b/foreign/go/binary_serialization/send_messages_request_serializer_test.go
@@ -74,9 +74,9 @@ func createDefaultMessageHeaders()
map[iggcon.HeaderKey]iggcon.HeaderValue {
}
func generateTestMessage(payload string) iggcon.IggyMessage {
- return iggcon.NewIggyMessageWithHeaders(
- uuid.New(),
+ msg, _ := iggcon.NewIggyMessage(
[]byte(payload),
- createDefaultMessageHeaders(),
- )
+ iggcon.WithID(uuid.New()),
+ iggcon.WithUserHeaders(createDefaultMessageHeaders()))
+ return msg
}
diff --git a/foreign/go/contracts/message_header.go
b/foreign/go/contracts/message_header.go
index c0c76df3..30b630f3 100644
--- a/foreign/go/contracts/message_header.go
+++ b/foreign/go/contracts/message_header.go
@@ -21,15 +21,15 @@ import (
"encoding/binary"
"errors"
"time"
-
- "github.com/google/uuid"
)
const MessageHeaderSize = 8 + 16 + 8 + 8 + 8 + 4 + 4
+type MessageID [16]byte
+
type MessageHeader struct {
Checksum uint64 `json:"checksum"`
- Id uuid.UUID `json:"id"`
+ Id MessageID `json:"id"`
Offset uint64 `json:"offset"`
Timestamp uint64 `json:"timestamp"`
OriginTimestamp uint64 `json:"origin_timestamp"`
@@ -37,7 +37,7 @@ type MessageHeader struct {
PayloadLength uint32 `json:"payload_length"`
}
-func NewMessageHeader(id uuid.UUID, payloadLength uint32, userHeaderLength
uint32) MessageHeader {
+func NewMessageHeader(id MessageID, payloadLength uint32, userHeaderLength
uint32) MessageHeader {
return MessageHeader{
Id: id,
OriginTimestamp: uint64(time.Now().UnixMicro()),
@@ -52,21 +52,21 @@ func MessageHeaderFromBytes(data []byte) (*MessageHeader,
error) {
return nil, errors.New("data has incorrect size, must be 56")
}
checksum := binary.LittleEndian.Uint64(data[0:8])
- id, _ := uuid.FromBytes(data[8:24])
+ id := data[8:24]
timestamp := binary.LittleEndian.Uint64(data[24:32])
- origin_timestamp := binary.LittleEndian.Uint64(data[32:40])
+ originTimestamp := binary.LittleEndian.Uint64(data[32:40])
offset := binary.LittleEndian.Uint64(data[40:48])
- user_header_length := binary.LittleEndian.Uint32(data[48:52])
- payload_length := binary.LittleEndian.Uint32(data[52:56])
+ userHeaderLength := binary.LittleEndian.Uint32(data[48:52])
+ payloadLength := binary.LittleEndian.Uint32(data[52:56])
return &MessageHeader{
Checksum: checksum,
- Id: id,
+ Id: MessageID(id),
Offset: offset,
Timestamp: timestamp,
- OriginTimestamp: origin_timestamp,
- UserHeaderLength: user_header_length,
- PayloadLength: payload_length,
+ OriginTimestamp: originTimestamp,
+ UserHeaderLength: userHeaderLength,
+ PayloadLength: payloadLength,
}, nil
}
@@ -74,7 +74,7 @@ func (mh *MessageHeader) ToBytes() []byte {
bytes := make([]byte, 0, MessageHeaderSize)
bytes = binary.LittleEndian.AppendUint64(bytes, mh.Checksum)
- idBytes, _ := uuid.UUID.MarshalBinary(mh.Id)
+ idBytes := mh.Id[:]
bytes = append(bytes, idBytes...)
bytes = binary.LittleEndian.AppendUint64(bytes, mh.Offset)
bytes = binary.LittleEndian.AppendUint64(bytes, mh.Timestamp)
diff --git a/foreign/go/contracts/messages.go b/foreign/go/contracts/messages.go
index b4ebe657..d08638b3 100644
--- a/foreign/go/contracts/messages.go
+++ b/foreign/go/contracts/messages.go
@@ -18,7 +18,31 @@
package iggcon
import (
- "github.com/google/uuid"
+ ierror "github.com/iggy-rs/iggy-go-client/errors"
+)
+
+const (
+ // MaxPayloadSize is maximum allowed size in bytes for a message
payload.
+ //
+ // This constant defines the upper limit for the size of an IggyMessage
payload. Attempting to create a message
+ // with a payload larger than this value will result
+ // in an ierror.TooBigUserMessagePayload error.
+ //
+ // Constraints
+ // - Minimum payload size: 1 byte (empty payloads are not allowed)
+ // - Maximum payload size: 10 MB
+ MaxPayloadSize = 10 * 1000 * 1000
+
+ // MaxUserHeadersSize is maximum allowed size in bytes for user-defined
headers.
+ //
+ // This constant defines the upper limit for the combined size of all
user headers in an IggyMessage. Attempting to
+ // create a message with user headers larger than this value will
result in an ierror.TooBigUserHeaders error.
+ //
+ // Constraints
+ // - Maximum headers size: 100 KB
+ // - Each individual header key is limited to 255 bytes
+ // - Each individual header value is limited to 255 bytes
+ MaxUserHeadersSize = 100 * 1000
)
type FetchMessagesRequest struct {
@@ -57,21 +81,46 @@ type IggyMessage struct {
UserHeaders []byte
}
-func NewIggyMessage(id uuid.UUID, payload []byte) IggyMessage {
- return IggyMessage{
- Header: NewMessageHeader(id, uint32(len(payload)), 0),
- Payload: payload,
+type IggyMessageOpt func(message *IggyMessage)
+
+// NewIggyMessage Creates a new message with customizable parameters.
+func NewIggyMessage(payload []byte, opts ...IggyMessageOpt) (IggyMessage,
error) {
+ if len(payload) == 0 {
+ return IggyMessage{}, ierror.InvalidMessagePayloadLength
+ }
+
+ if len(payload) > MaxPayloadSize {
+ return IggyMessage{}, ierror.TooBigUserMessagePayload
}
-}
-func NewIggyMessageWithHeaders(id uuid.UUID, payload []byte, userHeaders
map[HeaderKey]HeaderValue) IggyMessage {
- userHeaderBytes := GetHeadersBytes(userHeaders)
- messageHeader := NewMessageHeader(id, uint32(len(payload)), 0)
- messageHeader.UserHeaderLength = uint32(len(userHeaderBytes))
- iggyMessage := IggyMessage{
- Header: messageHeader,
+ header := NewMessageHeader(MessageID{}, uint32(len(payload)), 0)
+ message := IggyMessage{
+ Header: header,
Payload: payload,
- UserHeaders: userHeaderBytes,
+ UserHeaders: make([]byte, 0),
+ }
+ for _, opt := range opts {
+ if opt != nil {
+ opt(&message)
+ }
+ }
+ userHeaderLength := len(message.UserHeaders)
+ if userHeaderLength > MaxUserHeadersSize {
+ return IggyMessage{}, ierror.TooBigUserHeaders
+ }
+ message.Header.UserHeaderLength = uint32(userHeaderLength)
+ return message, nil
+}
+
+func WithID(id [16]byte) IggyMessageOpt {
+ return func(m *IggyMessage) {
+ m.Header.Id = id
+ }
+}
+
+func WithUserHeaders(userHeaders map[HeaderKey]HeaderValue) IggyMessageOpt {
+ return func(m *IggyMessage) {
+ userHeaderBytes := GetHeadersBytes(userHeaders)
+ m.UserHeaders = userHeaderBytes
}
- return iggyMessage
}
diff --git a/foreign/go/e2e/tcp_test/messages_steps.go
b/foreign/go/e2e/tcp_test/messages_steps.go
index 56f2834d..940e82cd 100644
--- a/foreign/go/e2e/tcp_test/messages_steps.go
+++ b/foreign/go/e2e/tcp_test/messages_steps.go
@@ -37,12 +37,9 @@ func createDefaultMessageHeaders()
map[iggcon.HeaderKey]iggcon.HeaderValue {
func createDefaultMessages() []iggcon.IggyMessage {
headers := createDefaultMessageHeaders()
- messages := []iggcon.IggyMessage{
- iggcon.NewIggyMessageWithHeaders(uuid.New(),
[]byte(createRandomString(256)), headers),
- iggcon.NewIggyMessageWithHeaders(uuid.New(),
[]byte(createRandomString(256)), headers),
- }
-
- return messages
+ msg1, _ := iggcon.NewIggyMessage([]byte(createRandomString(256)),
iggcon.WithID(uuid.New()), iggcon.WithUserHeaders(headers))
+ msg2, _ := iggcon.NewIggyMessage([]byte(createRandomString(256)),
iggcon.WithID(uuid.New()), iggcon.WithUserHeaders(headers))
+ return []iggcon.IggyMessage{msg1, msg2}
}
func itShouldSuccessfullyPublishMessages(streamId int, topicId int, messages
[]iggcon.IggyMessage, client iggy.MessageStream) {
diff --git a/foreign/go/errors/constants.go b/foreign/go/errors/constants.go
index 3a62c1ba..2de4ed94 100644
--- a/foreign/go/errors/constants.go
+++ b/foreign/go/errors/constants.go
@@ -18,6 +18,10 @@
package ierror
var (
+ ResourceNotFound = &IggyError{
+ Code: 20,
+ Message: "resource_not_found",
+ }
StreamIdNotFound = &IggyError{
Code: 1009,
Message: "stream_id_not_found",
@@ -26,12 +30,20 @@ var (
Code: 2010,
Message: "topic_id_not_found",
}
+ InvalidMessagePayloadLength = &IggyError{
+ Code: 4025,
+ Message: "invalid_message_payload_length",
+ }
+ TooBigUserMessagePayload = &IggyError{
+ Code: 4022,
+ Message: "too_big_message_payload",
+ }
+ TooBigUserHeaders = &IggyError{
+ Code: 4017,
+ Message: "too_big_headers_payload",
+ }
ConsumerGroupIdNotFound = &IggyError{
Code: 5000,
Message: "consumer_group_not_found",
}
- ResourceNotFound = &IggyError{
- Code: 20,
- Message: "resource_not_found",
- }
)
diff --git a/foreign/go/samples/producer/producer.go
b/foreign/go/samples/producer/producer.go
index 103f33af..1fcc0188 100644
--- a/foreign/go/samples/producer/producer.go
+++ b/foreign/go/samples/producer/producer.go
@@ -118,7 +118,7 @@ func PublishMessages(messageStream MessageStream) error {
debugMessages = append(debugMessages, message)
messages = append(messages, IggyMessage{
Header: MessageHeader{
- Id: uuid.New(),
+ Id: MessageID(uuid.New()),
OriginTimestamp:
uint64(time.Now().UnixMicro()),
UserHeaderLength: 0,
PayloadLength: uint32(len(json)),