This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new c4373346 chore(sdk): support zero copy schema in Go SDK (#1897)
c4373346 is described below

commit c4373346a2d85c67456b956975566d18b3e3a504
Author: Assan Kozhin <[email protected]>
AuthorDate: Mon Jun 23 00:27:05 2025 +0800

    chore(sdk): support zero copy schema in Go SDK (#1897)
    
    Fix code separation between binarySerialization and contract
    submodules. Decided that binary format of header is part of contract
    now, but can be easily fixed later if disagree.
    More tests to cover serialization. Added one and fixed two. Decided
    to keep scope of this PR low.
    
    ---------
    
    Co-authored-by: Piotr Gankiewicz <[email protected]>
---
 foreign/go/README.md                               |   3 +
 .../go/benchmarks/send_messages_benchmark_test.go  |   6 +-
 .../binary_response_deserializer.go                | 142 ++++---------------
 .../create_stream_serializer.go                    |   1 +
 .../fetch_messages_request_serializer.go           |   1 +
 .../fetch_messages_request_serializer_test.go      |  95 +++++++------
 .../send_messages_request_serializer.go            | 119 ++++++++--------
 .../send_messages_request_serializer_test.go       |  82 +++++++++++
 foreign/go/contracts/message_header.go             |  88 ++++++++----
 foreign/go/contracts/message_polling.go            |   2 +-
 foreign/go/contracts/messages.go                   |  64 +++++----
 foreign/go/contracts/user_headers.go               | 152 +++++++++++++++++++++
 foreign/go/e2e/tcp_test/messages_feature_send.go   |   2 +-
 foreign/go/e2e/tcp_test/messages_steps.go          |  17 +--
 foreign/go/e2e/tcp_test/tcp_suite_test.go          |   2 +-
 foreign/go/e2e/tcp_test/test_helpers.go            |   5 +-
 foreign/go/errors/errors.go                        |   8 ++
 foreign/go/go.mod                                  |  10 +-
 foreign/go/go.sum                                  |  12 +-
 foreign/go/samples/consumer/consumer.go            |  10 +-
 foreign/go/samples/producer/producer.go            |  12 +-
 foreign/go/tcp/tcp_core.go                         |   1 +
 22 files changed, 519 insertions(+), 315 deletions(-)

diff --git a/foreign/go/README.md b/foreign/go/README.md
new file mode 100644
index 00000000..77ebee70
--- /dev/null
+++ b/foreign/go/README.md
@@ -0,0 +1,3 @@
+# Tests
+
+Use ginkgo to run e2e tests.
diff --git a/foreign/go/benchmarks/send_messages_benchmark_test.go 
b/foreign/go/benchmarks/send_messages_benchmark_test.go
index fd43c879..74261c1a 100644
--- a/foreign/go/benchmarks/send_messages_benchmark_test.go
+++ b/foreign/go/benchmarks/send_messages_benchmark_test.go
@@ -139,8 +139,8 @@ func cleanupInfrastructure(messageStream 
iggy.MessageStream, streamId int) error
 }
 
 // CreateMessages creates messages with random payloads.
-func CreateMessages(messagesCount, messageSize int) []iggcon.Message {
-       messages := make([]iggcon.Message, messagesCount)
+func CreateMessages(messagesCount, messageSize int) []iggcon.IggyMessage {
+       messages := make([]iggcon.IggyMessage, messagesCount)
        for i := 0; i < messagesCount; i++ {
                payload := make([]byte, messageSize)
                for j := 0; j < messageSize; j++ {
@@ -148,7 +148,7 @@ func CreateMessages(messagesCount, messageSize int) 
[]iggcon.Message {
                }
                id, _ := uuid.NewUUID()
 
-               messages[i] = iggcon.Message{Id: id, Payload: payload}
+               messages[i] = iggcon.NewIggyMessage(id, payload)
        }
        return messages
 }
diff --git a/foreign/go/binary_serialization/binary_response_deserializer.go 
b/foreign/go/binary_serialization/binary_response_deserializer.go
index 806eab1a..c4cd5324 100644
--- a/foreign/go/binary_serialization/binary_response_deserializer.go
+++ b/foreign/go/binary_serialization/binary_response_deserializer.go
@@ -23,9 +23,7 @@ import (
        "fmt"
        "time"
 
-       "github.com/google/uuid"
        . "github.com/iggy-rs/iggy-go-client/contracts"
-       iggcon "github.com/iggy-rs/iggy-go-client/contracts"
        ierror "github.com/iggy-rs/iggy-go-client/errors"
        "github.com/klauspost/compress/s2"
 )
@@ -114,44 +112,42 @@ func DeserializeFetchMessagesResponse(payload []byte, 
compression IggyMessageCom
                return &FetchMessagesResponse{
                        PartitionId:   0,
                        CurrentOffset: 0,
-                       Messages:      make([]MessageResponse, 0),
+                       Messages:      make([]IggyMessage, 0),
                }, nil
        }
 
-       propertiesSize := 45
        length := len(payload)
-       partitionId := int(binary.LittleEndian.Uint32(payload[0:4]))
+       partitionId := binary.LittleEndian.Uint32(payload[0:4])
        currentOffset := binary.LittleEndian.Uint64(payload[4:12])
-       messagesCount := int(binary.LittleEndian.Uint32(payload[12:16]))
+       messagesCount := binary.LittleEndian.Uint32(payload[12:16])
        position := 16
-       var messages = make([]MessageResponse, 0)
+       var messages = make([]IggyMessage, 0)
        for position < length {
-               offset := binary.LittleEndian.Uint64(payload[position : 
position+8])
-               state, err := mapMessageState(payload[position+8])
-               timestamp := binary.LittleEndian.Uint64(payload[position+9 : 
position+17])
-               id, err := uuid.FromBytes(payload[position+17 : position+33])
-               checksum := binary.LittleEndian.Uint32(payload[position+33 : 
position+37])
-               headersLength := 
int(binary.LittleEndian.Uint32(payload[position+37 : position+41]))
-               headers, err := deserializeHeaders(payload[position+41 : 
position+41+headersLength])
+               if position+MessageHeaderSize >= length {
+                       // body needs to be at least 1 byte
+                       break
+               }
+               header, err := MessageHeaderFromBytes(payload[position : 
position+MessageHeaderSize])
                if err != nil {
                        return nil, err
                }
-               position += headersLength
-               messageLength := binary.LittleEndian.Uint32(payload[position+41 
: position+45])
-
-               payloadRangeStart := position + propertiesSize
-               payloadRangeEnd := payloadRangeStart + int(messageLength)
-               if payloadRangeStart > length || payloadRangeEnd > length {
+               position += MessageHeaderSize
+               payload_end := position + int(header.PayloadLength)
+               if int(payload_end) > length {
                        break
                }
+               payloadSlice := payload[position:payload_end]
+               position = int(payload_end)
 
-               payloadSlice := payload[payloadRangeStart:payloadRangeEnd]
-               totalSize := propertiesSize + int(messageLength)
-               position += totalSize
+               var user_headers []byte = nil
+               if header.UserHeaderLength > 0 {
+                       user_headers = payload[position : 
position+int(header.UserHeaderLength)]
+               }
+               position += int(header.UserHeaderLength)
 
                switch compression {
-               case iggcon.MESSAGE_COMPRESSION_S2, 
iggcon.MESSAGE_COMPRESSION_S2_BETTER, iggcon.MESSAGE_COMPRESSION_S2_BEST:
-                       if messageLength < 32 {
+               case MESSAGE_COMPRESSION_S2, MESSAGE_COMPRESSION_S2_BETTER, 
MESSAGE_COMPRESSION_S2_BEST:
+                       if length < 32 {
                                break
                        }
                        payloadSlice, err = s2.Decode(nil, payloadSlice)
@@ -160,19 +156,11 @@ func DeserializeFetchMessagesResponse(payload []byte, 
compression IggyMessageCom
                        }
                }
 
-               messages = append(messages, MessageResponse{
-                       Id:        id,
-                       Payload:   payloadSlice,
-                       Offset:    offset,
-                       Timestamp: timestamp,
-                       Checksum:  checksum,
-                       State:     state,
-                       Headers:   headers,
+               messages = append(messages, IggyMessage{
+                       Header:      *header,
+                       Payload:     payloadSlice,
+                       UserHeaders: user_headers,
                })
-
-               if position+propertiesSize >= length {
-                       break
-               }
        }
 
        // !TODO: Add message offset ordering
@@ -184,84 +172,6 @@ func DeserializeFetchMessagesResponse(payload []byte, 
compression IggyMessageCom
        }, nil
 }
 
-func mapMessageState(state byte) (MessageState, error) {
-       switch state {
-       case 1:
-               return MessageStateAvailable, nil
-       case 10:
-               return MessageStateUnavailable, nil
-       case 20:
-               return MessageStatePoisoned, nil
-       case 30:
-               return MessageStateMarkedForDeletion, nil
-       default:
-               return 0, errors.New("Invalid message state")
-       }
-}
-
-func deserializeHeaders(payload []byte) (map[HeaderKey]HeaderValue, error) {
-       headers := make(map[HeaderKey]HeaderValue)
-       position := 0
-
-       for position < len(payload) {
-               if len(payload) <= position+4 {
-                       return nil, errors.New("Invalid header key length")
-               }
-
-               keyLength := binary.LittleEndian.Uint32(payload[position : 
position+4])
-               if keyLength == 0 || 255 < keyLength {
-                       return nil, errors.New("Key has incorrect size, must be 
between 1 and 255")
-               }
-               position += 4
-
-               if len(payload) < position+int(keyLength) {
-                       return nil, errors.New("Invalid header key")
-               }
-
-               key := string(payload[position : position+int(keyLength)])
-               position += int(keyLength)
-
-               headerKind, err := deserializeHeaderKind(payload, position)
-               if err != nil {
-                       return nil, err
-               }
-               position++
-
-               if len(payload) <= position+4 {
-                       return nil, errors.New("Invalid header value length")
-               }
-
-               valueLength := binary.LittleEndian.Uint32(payload[position : 
position+4])
-               position += 4
-
-               if valueLength == 0 || 255 < valueLength {
-                       return nil, errors.New("Value has incorrect size, must 
be between 1 and 255")
-               }
-
-               if len(payload) < position+int(valueLength) {
-                       return nil, errors.New("Invalid header value")
-               }
-
-               value := payload[position : position+int(valueLength)]
-               position += int(valueLength)
-
-               headers[HeaderKey{Value: key}] = HeaderValue{
-                       Kind:  headerKind,
-                       Value: value,
-               }
-       }
-
-       return headers, nil
-}
-
-func deserializeHeaderKind(payload []byte, position int) (HeaderKind, error) {
-       if position >= len(payload) {
-               return 0, errors.New("Invalid header kind position")
-       }
-
-       return HeaderKind(payload[position]), nil
-}
-
 func DeserializeTopics(payload []byte) ([]TopicResponse, error) {
        topics := make([]TopicResponse, 0)
        length := len(payload)
@@ -383,7 +293,7 @@ func DeserializeToConsumerGroup(payload []byte, position 
int) (*ConsumerGroupRes
 
 func DeserializeUsers(payload []byte) ([]*UserResponse, error) {
        if len(payload) == 0 {
-               return nil, errors.New("Empty payload")
+               return nil, errors.New("empty payload")
        }
 
        var result []*UserResponse
diff --git a/foreign/go/binary_serialization/create_stream_serializer.go 
b/foreign/go/binary_serialization/create_stream_serializer.go
index a73596b8..d32cdfbd 100644
--- a/foreign/go/binary_serialization/create_stream_serializer.go
+++ b/foreign/go/binary_serialization/create_stream_serializer.go
@@ -19,6 +19,7 @@ package binaryserialization
 
 import (
        "encoding/binary"
+
        iggcon "github.com/iggy-rs/iggy-go-client/contracts"
 )
 
diff --git 
a/foreign/go/binary_serialization/fetch_messages_request_serializer.go 
b/foreign/go/binary_serialization/fetch_messages_request_serializer.go
index 2aefbbb8..9630432e 100644
--- a/foreign/go/binary_serialization/fetch_messages_request_serializer.go
+++ b/foreign/go/binary_serialization/fetch_messages_request_serializer.go
@@ -19,6 +19,7 @@ package binaryserialization
 
 import (
        "encoding/binary"
+
        iggcon "github.com/iggy-rs/iggy-go-client/contracts"
 )
 
diff --git 
a/foreign/go/binary_serialization/fetch_messages_request_serializer_test.go 
b/foreign/go/binary_serialization/fetch_messages_request_serializer_test.go
index 3b5e2e9e..3d20f4b1 100644
--- a/foreign/go/binary_serialization/fetch_messages_request_serializer_test.go
+++ b/foreign/go/binary_serialization/fetch_messages_request_serializer_test.go
@@ -17,48 +17,59 @@
 
 package binaryserialization
 
-//func TestSerialize_TcpFetchMessagesRequest(t *testing.T) {
-//     // Create a sample TcpFetchMessagesRequest
-//     request := TcpFetchMessagesRequest{
-//             FetchMessagesRequest: iggcon.FetchMessagesRequest{
-//                     Consumer: iggcon.Consumer{
-//                             Kind: iggcon.ConsumerSingle,
-//                             Id:   42,
-//                     },
-//                     StreamId:        iggcon.NewIdentifier("test_stream_id"),
-//                     TopicId:         iggcon.NewIdentifier("test_topic_id"),
-//                     PartitionId:     123,
-//                     PollingStrategy: iggcon.FirstPollingStrategy(),
-//                     Count:           100,
-//                     AutoCommit:      true,
-//             },
-//     }
-//
-//     // Serialize the request
-//     serialized := request.Serialize()
-//
-//     // Expected serialized bytes based on the provided sample request
-//     expected := []byte{
-//             0x01,                   // Consumer Kind
-//             0x2A, 0x00, 0x00, 0x00, // Consumer ID
-//             0x02,                                                           
                    // StreamId Kind (StringId)
-//             0x0E,                                                           
                    // StreamId Length (14)
-//             0x74, 0x65, 0x73, 0x74, 0x5F, 0x73, 0x74, 0x72, 0x65, 0x61, 
0x6D, 0x5F, 0x69, 0x64, // StreamId
-//             0x02,                                                           
              // TopicId Kind (StringId)
-//             0x0D,                                                           
              // TopicId Length (13)
-//             0x74, 0x65, 0x73, 0x74, 0x5F, 0x74, 0x6F, 0x70, 0x69, 0x63, 
0x5F, 0x69, 0x64, // TopicId
-//             0x7B, 0x00, 0x00, 0x00, // PartitionId (123)
-//             0x03,                                           // 
PollingStrategy Kind
-//             0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // 
PollingStrategy Value (0)
-//             0x64, 0x00, 0x00, 0x00, // Count (100)
-//             0x01, // AutoCommit
-//     }
-//
-//     // Check if the serialized bytes match the expected bytes
-//     if !areBytesEqual(serialized, expected) {
-//             t.Errorf("Serialized bytes are incorrect. 
\nExpected:\t%v\nGot:\t\t%v", expected, serialized)
-//     }
-//}
+import (
+       "testing"
+
+       iggcon "github.com/iggy-rs/iggy-go-client/contracts"
+)
+
+func TestSerialize_TcpFetchMessagesRequest(t *testing.T) {
+       // Create a sample TcpFetchMessagesRequest
+       request := TcpFetchMessagesRequest{
+               FetchMessagesRequest: iggcon.FetchMessagesRequest{
+                       Consumer: iggcon.Consumer{
+                               Kind: iggcon.ConsumerSingle,
+                               Id:   iggcon.NewIdentifier(42),
+                       },
+                       StreamId:        iggcon.NewIdentifier("test_stream_id"),
+                       TopicId:         iggcon.NewIdentifier("test_topic_id"),
+                       PartitionId:     123,
+                       PollingStrategy: iggcon.FirstPollingStrategy(),
+                       Count:           100,
+                       AutoCommit:      true,
+               },
+       }
+
+       // Serialize the request
+       serialized := request.Serialize()
+
+       // Expected serialized bytes based on the provided sample request
+       expected := []byte{
+               0x01,                 // Consumer Kind
+               0x01,                 // ConsumerId Kind (NumericId)
+               0x04,                 // ConsumerId Length (4)
+               0x2A, 0x00, 0x0, 0x0, // ConsumerId
+
+               0x02,                                                           
                    // StreamId Kind (StringId)
+               0x0E,                                                           
                    // StreamId Length (14)
+               0x74, 0x65, 0x73, 0x74, 0x5F, 0x73, 0x74, 0x72, 0x65, 0x61, 
0x6D, 0x5F, 0x69, 0x64, // StreamId
+
+               0x02,                                                           
              // TopicId Kind (StringId)
+               0x0D,                                                           
              // TopicId Length (13)
+               0x74, 0x65, 0x73, 0x74, 0x5F, 0x74, 0x6F, 0x70, 0x69, 0x63, 
0x5F, 0x69, 0x64, // TopicId
+
+               0x7B, 0x00, 0x00, 0x00, // PartitionId (123)
+               0x03,                                           // 
PollingStrategy Kind
+               0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // 
PollingStrategy Value (0)
+               0x64, 0x00, 0x00, 0x00, // Count (100)
+               0x01, // AutoCommit
+       }
+
+       // Check if the serialized bytes match the expected bytes
+       if !areBytesEqual(serialized, expected) {
+               t.Errorf("Serialized bytes are incorrect. 
\nExpected:\t%v\nGot:\t\t%v", expected, serialized)
+       }
+}
 
 func areBytesEqual(a, b []byte) bool {
        if len(a) != len(b) {
diff --git 
a/foreign/go/binary_serialization/send_messages_request_serializer.go 
b/foreign/go/binary_serialization/send_messages_request_serializer.go
index 6288026b..f7a3a204 100644
--- a/foreign/go/binary_serialization/send_messages_request_serializer.go
+++ b/foreign/go/binary_serialization/send_messages_request_serializer.go
@@ -28,6 +28,8 @@ type TcpSendMessagesRequest struct {
        iggcon.SendMessagesRequest
 }
 
+const indexSize = 16
+
 func (request *TcpSendMessagesRequest) Serialize(compression 
iggcon.IggyMessageCompression) []byte {
        for i, message := range request.Messages {
                switch compression {
@@ -36,94 +38,91 @@ func (request *TcpSendMessagesRequest) 
Serialize(compression iggcon.IggyMessageC
                                break
                        }
                        request.Messages[i].Payload = s2.Encode(nil, 
message.Payload)
+                       message.Header.PayloadLength = 
uint32(len(message.Payload))
                case iggcon.MESSAGE_COMPRESSION_S2_BETTER:
                        if len(message.Payload) < 32 {
                                break
                        }
                        request.Messages[i].Payload = s2.EncodeBetter(nil, 
message.Payload)
+                       message.Header.PayloadLength = 
uint32(len(message.Payload))
                case iggcon.MESSAGE_COMPRESSION_S2_BEST:
                        if len(message.Payload) < 32 {
                                break
                        }
                        request.Messages[i].Payload = s2.EncodeBest(nil, 
message.Payload)
+                       message.Header.PayloadLength = 
uint32(len(message.Payload))
                }
        }
 
-       streamTopicIdLength := 2 + request.StreamId.Length + 2 + 
request.TopicId.Length
+       streamIdFieldSize := 2 + request.StreamId.Length
+       topicIdFieldSize := 2 + request.TopicId.Length
+       partitioningFieldSize := 2 + request.Partitioning.Length
+       metadataLenFieldSize := 4 // uint32
+       messageCount := len(request.Messages)
+       messagesCountFieldSize := 4 // uint32
+       metadataLen := streamIdFieldSize +
+               topicIdFieldSize +
+               partitioningFieldSize +
+               messagesCountFieldSize
+       indexesSize := messageCount * indexSize
        messageBytesCount := calculateMessageBytesCount(request.Messages)
-       totalSize := streamTopicIdLength + messageBytesCount + 
request.Partitioning.Length + 2
+       totalSize := metadataLenFieldSize +
+               streamIdFieldSize +
+               topicIdFieldSize +
+               partitioningFieldSize +
+               messagesCountFieldSize +
+               indexesSize +
+               messageBytesCount
+
        bytes := make([]byte, totalSize)
+
        position := 0
+
+       //metadata
+       binary.LittleEndian.PutUint32(bytes[:4], uint32(metadataLen))
+       position = 4
        //ids
-       copy(bytes[position:2+request.StreamId.Length], 
SerializeIdentifier(request.StreamId))
-       copy(bytes[position+2+request.StreamId.Length:streamTopicIdLength], 
SerializeIdentifier(request.TopicId))
-       position = streamTopicIdLength
+       copy(bytes[position:position+streamIdFieldSize], 
SerializeIdentifier(request.StreamId))
+       
copy(bytes[position+streamIdFieldSize:position+streamIdFieldSize+topicIdFieldSize],
 SerializeIdentifier(request.TopicId))
+       position += streamIdFieldSize + topicIdFieldSize
 
        //partitioning
-       bytes[streamTopicIdLength] = byte(request.Partitioning.Kind)
-       bytes[streamTopicIdLength+1] = byte(request.Partitioning.Length)
-       
copy(bytes[streamTopicIdLength+2:streamTopicIdLength+2+request.Partitioning.Length],
 []byte(request.Partitioning.Value))
-       position = streamTopicIdLength + 2 + request.Partitioning.Length
-
-       emptyHeaders := make([]byte, 4)
+       bytes[position] = byte(request.Partitioning.Kind)
+       bytes[position+1] = byte(request.Partitioning.Length)
+       copy(bytes[position+2:position+partitioningFieldSize], 
[]byte(request.Partitioning.Value))
+       position += partitioningFieldSize
+       binary.LittleEndian.PutUint32(bytes[position:position+4], 
uint32(messageCount))
+       position += 4
+
+       currentIndexPosition := position
+       for i := 0; i < indexesSize; i++ {
+               bytes[position+i] = 0
+       }
+       position += indexesSize
 
+       msgSize := uint32(0)
        for _, message := range request.Messages {
-               copy(bytes[position:position+16], message.Id[:])
-               if message.Headers != nil {
-                       headersBytes := getHeadersBytes(message.Headers)
-                       
binary.LittleEndian.PutUint32(bytes[position+16:position+20], 
uint32(len(headersBytes)))
-                       copy(bytes[position+20:position+20+len(headersBytes)], 
headersBytes)
-                       position += len(headersBytes) + 20
-               } else {
-                       copy(bytes[position+16:position+16+len(emptyHeaders)], 
emptyHeaders)
-                       position += 20
-               }
-
-               binary.LittleEndian.PutUint32(bytes[position:position+4], 
uint32(len(message.Payload)))
-               copy(bytes[position+4:position+4+len(message.Payload)], 
message.Payload)
-               position += len(message.Payload) + 4
+               copy(bytes[position:position+iggcon.MessageHeaderSize], 
message.Header.ToBytes())
+               
copy(bytes[position+iggcon.MessageHeaderSize:position+iggcon.MessageHeaderSize+int(message.Header.PayloadLength)],
 message.Payload)
+               position += iggcon.MessageHeaderSize + 
int(message.Header.PayloadLength)
+               
copy(bytes[position:position+int(message.Header.UserHeaderLength)], 
message.UserHeaders)
+               position += int(message.Header.UserHeaderLength)
+
+               msgSize += iggcon.MessageHeaderSize + 
message.Header.PayloadLength + message.Header.UserHeaderLength
+
+               
binary.LittleEndian.PutUint32(bytes[currentIndexPosition:currentIndexPosition+4],
 0)
+               
binary.LittleEndian.PutUint32(bytes[currentIndexPosition+4:currentIndexPosition+8],
 uint32(msgSize))
+               
binary.LittleEndian.PutUint32(bytes[currentIndexPosition+8:currentIndexPosition+12],
 0)
+               currentIndexPosition += indexSize
        }
 
        return bytes
 }
 
-func calculateMessageBytesCount(messages []iggcon.Message) int {
+func calculateMessageBytesCount(messages []iggcon.IggyMessage) int {
        count := 0
        for _, msg := range messages {
-               count += 16 + 4 + len(msg.Payload) + 4
-               for key, header := range msg.Headers {
-                       count += 4 + len(key.Value) + 1 + 4 + len(header.Value)
-               }
+               count += iggcon.MessageHeaderSize + len(msg.Payload) + 
len(msg.UserHeaders)
        }
        return count
 }
-
-func getHeadersBytes(headers map[iggcon.HeaderKey]iggcon.HeaderValue) []byte {
-       headersLength := 0
-       for key, header := range headers {
-               headersLength += 4 + len(key.Value) + 1 + 4 + len(header.Value)
-       }
-       headersBytes := make([]byte, headersLength)
-       position := 0
-       for key, value := range headers {
-               headerBytes := getBytesFromHeader(key, value)
-               copy(headersBytes[position:position+len(headerBytes)], 
headerBytes)
-               position += len(headerBytes)
-       }
-       return headersBytes
-}
-
-func getBytesFromHeader(key iggcon.HeaderKey, value iggcon.HeaderValue) []byte 
{
-       headerBytesLength := 4 + len(key.Value) + 1 + 4 + len(value.Value)
-       headerBytes := make([]byte, headerBytesLength)
-
-       binary.LittleEndian.PutUint32(headerBytes[:4], uint32(len(key.Value)))
-       copy(headerBytes[4:4+len(key.Value)], key.Value)
-
-       headerBytes[4+len(key.Value)] = byte(value.Kind)
-
-       
binary.LittleEndian.PutUint32(headerBytes[4+len(key.Value)+1:4+len(key.Value)+1+4],
 uint32(len(value.Value)))
-       copy(headerBytes[4+len(key.Value)+1+4:], value.Value)
-
-       return headerBytes
-}
diff --git 
a/foreign/go/binary_serialization/send_messages_request_serializer_test.go 
b/foreign/go/binary_serialization/send_messages_request_serializer_test.go
new file mode 100644
index 00000000..f28ff19d
--- /dev/null
+++ b/foreign/go/binary_serialization/send_messages_request_serializer_test.go
@@ -0,0 +1,82 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package binaryserialization
+
+import (
+       "testing"
+
+       "github.com/google/uuid"
+       iggcon "github.com/iggy-rs/iggy-go-client/contracts"
+)
+
+func TestSerialize_SendMessagesRequest(t *testing.T) {
+       message1 := generateTestMessage("data1")
+       request := TcpSendMessagesRequest{
+               SendMessagesRequest: iggcon.SendMessagesRequest{
+                       StreamId:     iggcon.NewIdentifier("test_stream_id"),
+                       TopicId:      iggcon.NewIdentifier("test_topic_id"),
+                       Partitioning: iggcon.PartitionId(1),
+                       Messages: []iggcon.IggyMessage{
+                               message1,
+                       },
+               },
+       }
+
+       // Serialize the request
+       serialized := request.Serialize(iggcon.MESSAGE_COMPRESSION_NONE)
+
+       // Expected serialized bytes based on the provided sample request
+       expected := []byte{
+               0x29, 0x0, 0x0, 0x0, // metadataLength
+               0x02,                                                           
                    // StreamId Kind (StringId)
+               0x0E,                                                           
                    // StreamId Length (14)
+               0x74, 0x65, 0x73, 0x74, 0x5F, 0x73, 0x74, 0x72, 0x65, 0x61, 
0x6D, 0x5F, 0x69, 0x64, // StreamId
+
+               0x02,                                                           
              // TopicId Kind (StringId)
+               0x0D,                                                           
              // TopicId Length (13)
+               0x74, 0x65, 0x73, 0x74, 0x5F, 0x74, 0x6F, 0x70, 0x69, 0x63, 
0x5F, 0x69, 0x64, // TopicId
+               0x02,                   // PartitionIdKind
+               0x04,                   // Partitioning Length
+               0x01, 0x00, 0x00, 0x00, // PartitionId (123)
+               0x01, 0x0, 0x0, 0x0, // MessageCount
+               0, 0, 0, 0, 110, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, // Index 
(16*1) bytes
+       }
+       expected = append(expected, message1.Header.ToBytes()...)
+       expected = append(expected, message1.Payload...)
+       expected = append(expected, message1.UserHeaders...)
+
+       // Check if the serialized bytes match the expected bytes
+       if !areBytesEqual(serialized, expected) {
+               t.Errorf("Serialized bytes are incorrect. 
\nExpected:\t%v\nGot:\t\t%v", expected, serialized)
+       }
+}
+
+func createDefaultMessageHeaders() map[iggcon.HeaderKey]iggcon.HeaderValue {
+       return map[iggcon.HeaderKey]iggcon.HeaderValue{
+               {Value: "HeaderKey1"}: {Kind: iggcon.String, Value: 
[]byte("Value 1")},
+               {Value: "HeaderKey2"}: {Kind: iggcon.Uint32, Value: 
[]byte{0x01, 0x02, 0x03, 0x04}},
+       }
+}
+
+func generateTestMessage(payload string) iggcon.IggyMessage {
+       return iggcon.NewIggyMessageWithHeaders(
+               uuid.New(),
+               []byte(payload),
+               createDefaultMessageHeaders(),
+       )
+}
diff --git a/foreign/go/contracts/message_header.go 
b/foreign/go/contracts/message_header.go
index d33642b2..c0c76df3 100644
--- a/foreign/go/contracts/message_header.go
+++ b/foreign/go/contracts/message_header.go
@@ -17,40 +17,70 @@
 
 package iggcon
 
-import "errors"
+import (
+       "encoding/binary"
+       "errors"
+       "time"
 
-type HeaderValue struct {
-       Kind  HeaderKind
-       Value []byte
+       "github.com/google/uuid"
+)
+
+const MessageHeaderSize = 8 + 16 + 8 + 8 + 8 + 4 + 4
+
+type MessageHeader struct {
+       Checksum         uint64    `json:"checksum"`
+       Id               uuid.UUID `json:"id"`
+       Offset           uint64    `json:"offset"`
+       Timestamp        uint64    `json:"timestamp"`
+       OriginTimestamp  uint64    `json:"origin_timestamp"`
+       UserHeaderLength uint32    `json:"user_header_length"`
+       PayloadLength    uint32    `json:"payload_length"`
 }
 
-type HeaderKey struct {
-       Value string
+func NewMessageHeader(id uuid.UUID, payloadLength uint32, userHeaderLength 
uint32) MessageHeader {
+       return MessageHeader{
+               Id:               id,
+               OriginTimestamp:  uint64(time.Now().UnixMicro()),
+               PayloadLength:    payloadLength,
+               UserHeaderLength: userHeaderLength,
+       }
 }
 
-func NewHeaderKey(val string) (HeaderKey, error) {
-       if len(val) == 0 || len(val) > 255 {
-               return HeaderKey{}, errors.New("Value has incorrect size, must 
be between 1 and 255")
+func MessageHeaderFromBytes(data []byte) (*MessageHeader, error) {
+
+       if len(data) != MessageHeaderSize {
+               return nil, errors.New("data has incorrect size, must be 56")
        }
-       return HeaderKey{Value: val}, nil
+       checksum := binary.LittleEndian.Uint64(data[0:8])
+       id, _ := uuid.FromBytes(data[8:24])
+       timestamp := binary.LittleEndian.Uint64(data[24:32])
+       origin_timestamp := binary.LittleEndian.Uint64(data[32:40])
+       offset := binary.LittleEndian.Uint64(data[40:48])
+       user_header_length := binary.LittleEndian.Uint32(data[48:52])
+       payload_length := binary.LittleEndian.Uint32(data[52:56])
+
+       return &MessageHeader{
+               Checksum:         checksum,
+               Id:               id,
+               Offset:           offset,
+               Timestamp:        timestamp,
+               OriginTimestamp:  origin_timestamp,
+               UserHeaderLength: user_header_length,
+               PayloadLength:    payload_length,
+       }, nil
 }
 
-type HeaderKind int
-
-const (
-       Raw     HeaderKind = 1
-       String  HeaderKind = 2
-       Bool    HeaderKind = 3
-       Int8    HeaderKind = 4
-       Int16   HeaderKind = 5
-       Int32   HeaderKind = 6
-       Int64   HeaderKind = 7
-       Int128  HeaderKind = 8
-       Uint8   HeaderKind = 9
-       Uint16  HeaderKind = 10
-       Uint32  HeaderKind = 11
-       Uint64  HeaderKind = 12
-       Uint128 HeaderKind = 13
-       Float   HeaderKind = 14
-       Double  HeaderKind = 15
-)
+func (mh *MessageHeader) ToBytes() []byte {
+       bytes := make([]byte, 0, MessageHeaderSize)
+
+       bytes = binary.LittleEndian.AppendUint64(bytes, mh.Checksum)
+       idBytes, _ := uuid.UUID.MarshalBinary(mh.Id)
+       bytes = append(bytes, idBytes...)
+       bytes = binary.LittleEndian.AppendUint64(bytes, mh.Offset)
+       bytes = binary.LittleEndian.AppendUint64(bytes, mh.Timestamp)
+       bytes = binary.LittleEndian.AppendUint64(bytes, mh.OriginTimestamp)
+       bytes = binary.LittleEndian.AppendUint32(bytes, mh.UserHeaderLength)
+       bytes = binary.LittleEndian.AppendUint32(bytes, mh.PayloadLength)
+
+       return bytes
+}
diff --git a/foreign/go/contracts/message_polling.go 
b/foreign/go/contracts/message_polling.go
index 67cd37aa..df9bdad9 100644
--- a/foreign/go/contracts/message_polling.go
+++ b/foreign/go/contracts/message_polling.go
@@ -22,7 +22,7 @@ type PollingStrategy struct {
        Value uint64
 }
 
-type MessagePolling int
+type MessagePolling byte
 
 const (
        POLLING_OFFSET    MessagePolling = 1
diff --git a/foreign/go/contracts/messages.go b/foreign/go/contracts/messages.go
index fb1e9b3d..b4ebe657 100644
--- a/foreign/go/contracts/messages.go
+++ b/foreign/go/contracts/messages.go
@@ -32,48 +32,46 @@ type FetchMessagesRequest struct {
 }
 
 type FetchMessagesResponse struct {
-       PartitionId   int
+       PartitionId   uint32
        CurrentOffset uint64
-       Messages      []MessageResponse
-       MessageCount  int
+       MessageCount  uint32
+       Messages      []IggyMessage
 }
 
-type MessageResponse struct {
-       Offset    uint64                    `json:"offset"`
-       Timestamp uint64                    `json:"timestamp"`
-       Checksum  uint32                    `json:"checksum"`
-       Id        uuid.UUID                 `json:"id"`
-       Payload   []byte                    `json:"payload"`
-       Headers   map[HeaderKey]HeaderValue `json:"headers,omitempty"`
-       State     MessageState              `json:"state"`
+type SendMessagesRequest struct {
+       StreamId     Identifier    `json:"streamId"`
+       TopicId      Identifier    `json:"topicId"`
+       Partitioning Partitioning  `json:"partitioning"`
+       Messages     []IggyMessage `json:"messages"`
 }
 
-type MessageState int
-
-const (
-       MessageStateAvailable MessageState = iota
-       MessageStateUnavailable
-       MessageStatePoisoned
-       MessageStateMarkedForDeletion
-)
-
-type SendMessagesRequest struct {
-       StreamId     Identifier   `json:"streamId"`
-       TopicId      Identifier   `json:"topicId"`
-       Partitioning Partitioning `json:"partitioning"`
-       Messages     []Message    `json:"messages"`
+type ReceivedMessage struct {
+       Message       IggyMessage
+       CurrentOffset uint64
+       PartitionId   uint32
 }
 
-type Message struct {
-       Id      uuid.UUID
-       Payload []byte
-       Headers map[HeaderKey]HeaderValue
+type IggyMessage struct {
+       Header      MessageHeader
+       Payload     []byte
+       UserHeaders []byte
 }
 
-func NewMessage(payload []byte, headers map[HeaderKey]HeaderValue) Message {
-       return Message{
-               Id:      uuid.New(),
+func NewIggyMessage(id uuid.UUID, payload []byte) IggyMessage {
+       return IggyMessage{
+               Header:  NewMessageHeader(id, uint32(len(payload)), 0),
                Payload: payload,
-               Headers: headers,
        }
 }
+
+func NewIggyMessageWithHeaders(id uuid.UUID, payload []byte, userHeaders 
map[HeaderKey]HeaderValue) IggyMessage {
+       userHeaderBytes := GetHeadersBytes(userHeaders)
+       messageHeader := NewMessageHeader(id, uint32(len(payload)), 0)
+       messageHeader.UserHeaderLength = uint32(len(userHeaderBytes))
+       iggyMessage := IggyMessage{
+               Header:      messageHeader,
+               Payload:     payload,
+               UserHeaders: userHeaderBytes,
+       }
+       return iggyMessage
+}
diff --git a/foreign/go/contracts/user_headers.go 
b/foreign/go/contracts/user_headers.go
new file mode 100644
index 00000000..3349c895
--- /dev/null
+++ b/foreign/go/contracts/user_headers.go
@@ -0,0 +1,152 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package iggcon
+
+import (
+       "encoding/binary"
+       "errors"
+)
+
+type HeaderValue struct {
+       Kind  HeaderKind
+       Value []byte
+}
+
+type HeaderKey struct {
+       Value string
+}
+
+func NewHeaderKey(val string) (HeaderKey, error) {
+       if len(val) == 0 || len(val) > 255 {
+               return HeaderKey{}, errors.New("value has incorrect size, must 
be between 1 and 255")
+       }
+       return HeaderKey{Value: val}, nil
+}
+
+type HeaderKind int
+
+const (
+       Raw     HeaderKind = 1
+       String  HeaderKind = 2
+       Bool    HeaderKind = 3
+       Int8    HeaderKind = 4
+       Int16   HeaderKind = 5
+       Int32   HeaderKind = 6
+       Int64   HeaderKind = 7
+       Int128  HeaderKind = 8
+       Uint8   HeaderKind = 9
+       Uint16  HeaderKind = 10
+       Uint32  HeaderKind = 11
+       Uint64  HeaderKind = 12
+       Uint128 HeaderKind = 13
+       Float   HeaderKind = 14
+       Double  HeaderKind = 15
+)
+
+func GetHeadersBytes(headers map[HeaderKey]HeaderValue) []byte {
+       headersLength := 0
+       for key, header := range headers {
+               headersLength += 4 + len(key.Value) + 1 + 4 + len(header.Value)
+       }
+       headersBytes := make([]byte, headersLength)
+       position := 0
+       for key, value := range headers {
+               headerBytes := getBytesFromHeader(key, value)
+               copy(headersBytes[position:position+len(headerBytes)], 
headerBytes)
+               position += len(headerBytes)
+       }
+       return headersBytes
+}
+
+func getBytesFromHeader(key HeaderKey, value HeaderValue) []byte {
+       headerBytesLength := 4 + len(key.Value) + 1 + 4 + len(value.Value)
+       headerBytes := make([]byte, headerBytesLength)
+
+       binary.LittleEndian.PutUint32(headerBytes[:4], uint32(len(key.Value)))
+       copy(headerBytes[4:4+len(key.Value)], key.Value)
+
+       headerBytes[4+len(key.Value)] = byte(value.Kind)
+
+       
binary.LittleEndian.PutUint32(headerBytes[4+len(key.Value)+1:4+len(key.Value)+1+4],
 uint32(len(value.Value)))
+       copy(headerBytes[4+len(key.Value)+1+4:], value.Value)
+
+       return headerBytes
+}
+
+func DeserializeHeaders(userHeadersBytes []byte) (map[HeaderKey]HeaderValue, 
error) {
+       headers := make(map[HeaderKey]HeaderValue)
+       position := 0
+
+       for position < len(userHeadersBytes) {
+               if len(userHeadersBytes) <= position+4 {
+                       return nil, errors.New("invalid header key length")
+               }
+
+               keyLength := 
binary.LittleEndian.Uint32(userHeadersBytes[position : position+4])
+               if keyLength == 0 || 255 < keyLength {
+                       return nil, errors.New("key has incorrect size, must be 
between 1 and 255")
+               }
+               position += 4
+
+               if len(userHeadersBytes) < position+int(keyLength) {
+                       return nil, errors.New("invalid header key")
+               }
+
+               key := string(userHeadersBytes[position : 
position+int(keyLength)])
+               position += int(keyLength)
+
+               headerKind, err := deserializeHeaderKind(userHeadersBytes, 
position)
+               if err != nil {
+                       return nil, err
+               }
+               position++
+
+               if len(userHeadersBytes) <= position+4 {
+                       return nil, errors.New("invalid header value length")
+               }
+
+               valueLength := 
binary.LittleEndian.Uint32(userHeadersBytes[position : position+4])
+               position += 4
+
+               if valueLength == 0 || 255 < valueLength {
+                       return nil, errors.New("value has incorrect size, must 
be between 1 and 255")
+               }
+
+               if len(userHeadersBytes) < position+int(valueLength) {
+                       return nil, errors.New("invalid header value")
+               }
+
+               value := userHeadersBytes[position : position+int(valueLength)]
+               position += int(valueLength)
+
+               headers[HeaderKey{Value: key}] = HeaderValue{
+                       Kind:  headerKind,
+                       Value: value,
+               }
+       }
+
+       return headers, nil
+}
+
+func deserializeHeaderKind(payload []byte, position int) (HeaderKind, error) {
+       if position >= len(payload) {
+               return 0, errors.New("invalid header kind position")
+       }
+
+       return HeaderKind(payload[position]), nil
+}
diff --git a/foreign/go/e2e/tcp_test/messages_feature_send.go 
b/foreign/go/e2e/tcp_test/messages_feature_send.go
index e8ed778f..41894f7f 100644
--- a/foreign/go/e2e/tcp_test/messages_feature_send.go
+++ b/foreign/go/e2e/tcp_test/messages_feature_send.go
@@ -95,7 +95,7 @@ var _ = Describe("SEND MESSAGES:", func() {
                                StreamId:     iggcon.NewIdentifier(streamId),
                                TopicId:      iggcon.NewIdentifier(topicId),
                                Partitioning: 
iggcon.PartitionId(int(createRandomUInt32())),
-                               Messages:     []iggcon.Message{},
+                               Messages:     []iggcon.IggyMessage{},
                        }
                        err := client.SendMessages(request)
                        itShouldReturnSpecificError(err, 
"messages_count_should_be_greater_than_zero")
diff --git a/foreign/go/e2e/tcp_test/messages_steps.go 
b/foreign/go/e2e/tcp_test/messages_steps.go
index f084ce50..56f2834d 100644
--- a/foreign/go/e2e/tcp_test/messages_steps.go
+++ b/foreign/go/e2e/tcp_test/messages_steps.go
@@ -21,6 +21,7 @@ import (
        "bytes"
        "reflect"
 
+       "github.com/google/uuid"
        "github.com/iggy-rs/iggy-go-client"
        iggcon "github.com/iggy-rs/iggy-go-client/contracts"
        . "github.com/onsi/ginkgo/v2"
@@ -34,17 +35,17 @@ func createDefaultMessageHeaders() 
map[iggcon.HeaderKey]iggcon.HeaderValue {
        }
 }
 
-func createDefaultMessages() []iggcon.Message {
+func createDefaultMessages() []iggcon.IggyMessage {
        headers := createDefaultMessageHeaders()
-       messages := []iggcon.Message{
-               iggcon.NewMessage([]byte(createRandomString(256)), headers),
-               iggcon.NewMessage([]byte(createRandomString(256)), headers),
+       messages := []iggcon.IggyMessage{
+               iggcon.NewIggyMessageWithHeaders(uuid.New(), 
[]byte(createRandomString(256)), headers),
+               iggcon.NewIggyMessageWithHeaders(uuid.New(), 
[]byte(createRandomString(256)), headers),
        }
 
        return messages
 }
 
-func itShouldSuccessfullyPublishMessages(streamId int, topicId int, messages 
[]iggcon.Message, client iggy.MessageStream) {
+func itShouldSuccessfullyPublishMessages(streamId int, topicId int, messages 
[]iggcon.IggyMessage, client iggy.MessageStream) {
        result, err := client.PollMessages(iggcon.FetchMessagesRequest{
                StreamId: iggcon.NewIdentifier(streamId),
                TopicId:  iggcon.NewIdentifier(topicId),
@@ -77,10 +78,10 @@ func itShouldSuccessfullyPublishMessages(streamId int, 
topicId int, messages []i
        })
 }
 
-func compareMessage(resultMessages []iggcon.MessageResponse, expectedMessage 
iggcon.Message) bool {
+func compareMessage(resultMessages []iggcon.IggyMessage, expectedMessage 
iggcon.IggyMessage) bool {
        for _, msg := range resultMessages {
-               if msg.Id == expectedMessage.Id && bytes.Equal(msg.Payload, 
expectedMessage.Payload) {
-                       if reflect.DeepEqual(msg.Headers, 
expectedMessage.Headers) {
+               if msg.Header.Id == expectedMessage.Header.Id && 
bytes.Equal(msg.Payload, expectedMessage.Payload) {
+                       if reflect.DeepEqual(msg.UserHeaders, 
expectedMessage.UserHeaders) {
                                return true
                        }
                }
diff --git a/foreign/go/e2e/tcp_test/tcp_suite_test.go 
b/foreign/go/e2e/tcp_test/tcp_suite_test.go
index 9a56578e..e2e78a6f 100644
--- a/foreign/go/e2e/tcp_test/tcp_suite_test.go
+++ b/foreign/go/e2e/tcp_test/tcp_suite_test.go
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package tcp_test_test
+package tcp_test
 
 import (
        "testing"
diff --git a/foreign/go/e2e/tcp_test/test_helpers.go 
b/foreign/go/e2e/tcp_test/test_helpers.go
index ce38fbc2..e61db65a 100644
--- a/foreign/go/e2e/tcp_test/test_helpers.go
+++ b/foreign/go/e2e/tcp_test/test_helpers.go
@@ -18,11 +18,12 @@
 package tcp_test
 
 import (
-       . "github.com/iggy-rs/iggy-go-client"
-       . "github.com/iggy-rs/iggy-go-client/contracts"
        "math/rand"
        "strings"
        "time"
+
+       . "github.com/iggy-rs/iggy-go-client"
+       . "github.com/iggy-rs/iggy-go-client/contracts"
 )
 
 func createAuthorizedConnection() MessageStream {
diff --git a/foreign/go/errors/errors.go b/foreign/go/errors/errors.go
index 89710be8..0c45beaf 100644
--- a/foreign/go/errors/errors.go
+++ b/foreign/go/errors/errors.go
@@ -61,6 +61,8 @@ func TranslateErrorCode(code int) string {
                return "invalid_format"
        case 5:
                return "feature_unavailable"
+       case 6:
+               return "invalid_identifier"
        case 10:
                return "cannot_create_base_directory"
        case 20:
@@ -109,6 +111,8 @@ func TranslateErrorCode(code int) string {
                return "cannot_parse_int"
        case 204:
                return "cannot_parse_slice"
+       case 206:
+               return "connection_closed"
        case 300:
                return "http_response_error"
        case 301:
@@ -273,6 +277,10 @@ func TranslateErrorCode(code int) string {
                return "invalid_message_checksum"
        case 4028:
                return "invalid_key_value_length"
+       case 4032:
+               return "non_zero_timestamp"
+       case 4036:
+               return "invalid_messages_size"
        case 4100:
                return "invalid_offset"
        case 4101:
diff --git a/foreign/go/go.mod b/foreign/go/go.mod
index d45cf6ff..9df789a9 100644
--- a/foreign/go/go.mod
+++ b/foreign/go/go.mod
@@ -1,8 +1,8 @@
 module github.com/iggy-rs/iggy-go-client
 
-go 1.22.0
+go 1.23.0
 
-toolchain go1.22.4
+toolchain go1.23.1
 
 require (
        github.com/google/uuid v1.6.0
@@ -17,9 +17,9 @@ require (
        github.com/google/go-cmp v0.6.0 // indirect
        github.com/google/pprof v0.0.0-20241210010833-40e02aabc2ad // indirect
        github.com/kr/pretty v0.3.0 // indirect
-       golang.org/x/net v0.33.0 // indirect
-       golang.org/x/sys v0.28.0 // indirect
-       golang.org/x/text v0.21.0 // indirect
+       golang.org/x/net v0.38.0 // indirect
+       golang.org/x/sys v0.31.0 // indirect
+       golang.org/x/text v0.23.0 // indirect
        golang.org/x/tools v0.28.0 // indirect
        gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect
        gopkg.in/yaml.v3 v3.0.1 // indirect
diff --git a/foreign/go/go.sum b/foreign/go/go.sum
index 3980068d..cb576a09 100644
--- a/foreign/go/go.sum
+++ b/foreign/go/go.sum
@@ -30,12 +30,12 @@ github.com/rogpeppe/go-internal v1.6.1 
h1:/FiVV8dS/e+YqF2JvO3yXRFbBLTIuSDkuC7aBO
 github.com/rogpeppe/go-internal v1.6.1/go.mod 
h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc=
 github.com/stretchr/testify v1.8.4 
h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
 github.com/stretchr/testify v1.8.4/go.mod 
h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
-golang.org/x/net v0.33.0 h1:74SYHlV8BIgHIFC/LrYkOGIwL19eTYXQ5wc6TBuO36I=
-golang.org/x/net v0.33.0/go.mod h1:HXLR5J+9DxmrqMwG9qjGCxZ+zKXxBru04zlTvWlWuN4=
-golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA=
-golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
-golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo=
-golang.org/x/text v0.21.0/go.mod 
h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ=
+golang.org/x/net v0.38.0 h1:vRMAPTMaeGqVhG5QyLJHqNDwecKTomGeqbnfZyKlBI8=
+golang.org/x/net v0.38.0/go.mod h1:ivrbrMbzFq5J41QOQh0siUuly180yBYtLp+CKbEaFx8=
+golang.org/x/sys v0.31.0 h1:ioabZlmFYtWhL+TRYpcnNlLwhyxaM9kWTDEmfnprqik=
+golang.org/x/sys v0.31.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
+golang.org/x/text v0.23.0 h1:D71I7dUrlY+VX0gQShAThNGHFxZ13dGLBHQLVl1mJlY=
+golang.org/x/text v0.23.0/go.mod 
h1:/BLNzu4aZCJ1+kcD0DNRotWKage4q2rGVAg4o22unh4=
 golang.org/x/tools v0.28.0 h1:WuB6qZ4RPCQo5aP3WdKZS7i595EdWqWR8vqJTlwTVK8=
 golang.org/x/tools v0.28.0/go.mod 
h1:dcIOrVd3mfQKTgrDVQHqCPMWy6lnhfhtX3hLXYVLfRw=
 google.golang.org/protobuf v1.36.1 
h1:yBPeRvTftaleIgM3PZ/WBIZ7XM/eEYAaEyCwvyjq/gk=
diff --git a/foreign/go/samples/consumer/consumer.go 
b/foreign/go/samples/consumer/consumer.go
index 2cc03af8..81373039 100644
--- a/foreign/go/samples/consumer/consumer.go
+++ b/foreign/go/samples/consumer/consumer.go
@@ -138,11 +138,11 @@ func ConsumeMessages(messageStream MessageStream) error {
        }
 }
 
-func HandleMessage(messageResponse MessageResponse) error {
-       length := (len(messageResponse.Payload) * 3) / 4
+func HandleMessage(iggyMessage IggyMessage) error {
+       length := (len(iggyMessage.Payload) * 3) / 4
        bytes := make([]byte, length)
 
-       str := string(messageResponse.Payload)
+       str := string(iggyMessage.Payload)
        isBase64 := false
 
        if _, err := base64.StdEncoding.Decode(bytes, []byte(str)); err == nil {
@@ -162,12 +162,12 @@ func HandleMessage(messageResponse MessageResponse) error 
{
                        return err
                }
        } else {
-               if err := json.Unmarshal([]byte(messageResponse.Payload), 
&envelope); err != nil {
+               if err := json.Unmarshal([]byte(iggyMessage.Payload), 
&envelope); err != nil {
                        return err
                }
        }
 
-       fmt.Printf("Handling message type: %s at offset: %d with message Id: %s 
", envelope.MessageType, messageResponse.Offset, messageResponse.Id)
+       fmt.Printf("Handling message type: %s at offset: %d with message Id: %s 
", envelope.MessageType, iggyMessage.Header.Offset, iggyMessage.Header.Id)
 
        switch envelope.MessageType {
        case "order_created":
diff --git a/foreign/go/samples/producer/producer.go 
b/foreign/go/samples/producer/producer.go
index 6f0b2d74..103f33af 100644
--- a/foreign/go/samples/producer/producer.go
+++ b/foreign/go/samples/producer/producer.go
@@ -109,15 +109,20 @@ func PublishMessages(messageStream MessageStream) error {
 
        for {
                var debugMessages []sharedDemoContracts.ISerializableMessage
-               var messages []Message
+               var messages []IggyMessage
 
                for i := 0; i < MessageBatchCount; i++ {
                        message := messageGenerator.GenerateMessage()
                        json := message.ToBytes()
 
                        debugMessages = append(debugMessages, message)
-                       messages = append(messages, Message{
-                               Id:      uuid.New(),
+                       messages = append(messages, IggyMessage{
+                               Header: MessageHeader{
+                                       Id:               uuid.New(),
+                                       OriginTimestamp:  
uint64(time.Now().UnixMicro()),
+                                       UserHeaderLength: 0,
+                                       PayloadLength:    uint32(len(json)),
+                               },
                                Payload: json,
                        })
                }
@@ -129,6 +134,7 @@ func PublishMessages(messageStream MessageStream) error {
                        Partitioning: PartitionId(Partition),
                })
                if err != nil {
+                       fmt.Printf("%s", err)
                        return nil
                }
 
diff --git a/foreign/go/tcp/tcp_core.go b/foreign/go/tcp/tcp_core.go
index e625fac5..d31b00b0 100644
--- a/foreign/go/tcp/tcp_core.go
+++ b/foreign/go/tcp/tcp_core.go
@@ -135,6 +135,7 @@ func (tms *IggyTcpClient) sendAndFetchResponse(message 
[]byte, command CommandCo
                        responseCode == 51 ||
                        responseCode == 5001 ||
                        responseCode == 5004 {
+                       // do nothing
                } else {
                        return nil, ierror.MapFromCode(responseCode)
                }


Reply via email to