This is an automated email from the ASF dual-hosted git repository.
hgruszecki pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new c4373346 chore(sdk): support zero copy schema in Go SDK (#1897)
c4373346 is described below
commit c4373346a2d85c67456b956975566d18b3e3a504
Author: Assan Kozhin <[email protected]>
AuthorDate: Mon Jun 23 00:27:05 2025 +0800
chore(sdk): support zero copy schema in Go SDK (#1897)
Fix code separation between binarySerialization and contract
submodules. Decided that binary format of header is part of contract
now, but can be easily fixed later if disagree.
More tests to cover serialization. Added one and fixed two. Decided
to keep scope of this PR low.
---------
Co-authored-by: Piotr Gankiewicz <[email protected]>
---
foreign/go/README.md | 3 +
.../go/benchmarks/send_messages_benchmark_test.go | 6 +-
.../binary_response_deserializer.go | 142 ++++---------------
.../create_stream_serializer.go | 1 +
.../fetch_messages_request_serializer.go | 1 +
.../fetch_messages_request_serializer_test.go | 95 +++++++------
.../send_messages_request_serializer.go | 119 ++++++++--------
.../send_messages_request_serializer_test.go | 82 +++++++++++
foreign/go/contracts/message_header.go | 88 ++++++++----
foreign/go/contracts/message_polling.go | 2 +-
foreign/go/contracts/messages.go | 64 +++++----
foreign/go/contracts/user_headers.go | 152 +++++++++++++++++++++
foreign/go/e2e/tcp_test/messages_feature_send.go | 2 +-
foreign/go/e2e/tcp_test/messages_steps.go | 17 +--
foreign/go/e2e/tcp_test/tcp_suite_test.go | 2 +-
foreign/go/e2e/tcp_test/test_helpers.go | 5 +-
foreign/go/errors/errors.go | 8 ++
foreign/go/go.mod | 10 +-
foreign/go/go.sum | 12 +-
foreign/go/samples/consumer/consumer.go | 10 +-
foreign/go/samples/producer/producer.go | 12 +-
foreign/go/tcp/tcp_core.go | 1 +
22 files changed, 519 insertions(+), 315 deletions(-)
diff --git a/foreign/go/README.md b/foreign/go/README.md
new file mode 100644
index 00000000..77ebee70
--- /dev/null
+++ b/foreign/go/README.md
@@ -0,0 +1,3 @@
+# Tests
+
+Use ginkgo to run e2e tests.
diff --git a/foreign/go/benchmarks/send_messages_benchmark_test.go
b/foreign/go/benchmarks/send_messages_benchmark_test.go
index fd43c879..74261c1a 100644
--- a/foreign/go/benchmarks/send_messages_benchmark_test.go
+++ b/foreign/go/benchmarks/send_messages_benchmark_test.go
@@ -139,8 +139,8 @@ func cleanupInfrastructure(messageStream
iggy.MessageStream, streamId int) error
}
// CreateMessages creates messages with random payloads.
-func CreateMessages(messagesCount, messageSize int) []iggcon.Message {
- messages := make([]iggcon.Message, messagesCount)
+func CreateMessages(messagesCount, messageSize int) []iggcon.IggyMessage {
+ messages := make([]iggcon.IggyMessage, messagesCount)
for i := 0; i < messagesCount; i++ {
payload := make([]byte, messageSize)
for j := 0; j < messageSize; j++ {
@@ -148,7 +148,7 @@ func CreateMessages(messagesCount, messageSize int)
[]iggcon.Message {
}
id, _ := uuid.NewUUID()
- messages[i] = iggcon.Message{Id: id, Payload: payload}
+ messages[i] = iggcon.NewIggyMessage(id, payload)
}
return messages
}
diff --git a/foreign/go/binary_serialization/binary_response_deserializer.go
b/foreign/go/binary_serialization/binary_response_deserializer.go
index 806eab1a..c4cd5324 100644
--- a/foreign/go/binary_serialization/binary_response_deserializer.go
+++ b/foreign/go/binary_serialization/binary_response_deserializer.go
@@ -23,9 +23,7 @@ import (
"fmt"
"time"
- "github.com/google/uuid"
. "github.com/iggy-rs/iggy-go-client/contracts"
- iggcon "github.com/iggy-rs/iggy-go-client/contracts"
ierror "github.com/iggy-rs/iggy-go-client/errors"
"github.com/klauspost/compress/s2"
)
@@ -114,44 +112,42 @@ func DeserializeFetchMessagesResponse(payload []byte,
compression IggyMessageCom
return &FetchMessagesResponse{
PartitionId: 0,
CurrentOffset: 0,
- Messages: make([]MessageResponse, 0),
+ Messages: make([]IggyMessage, 0),
}, nil
}
- propertiesSize := 45
length := len(payload)
- partitionId := int(binary.LittleEndian.Uint32(payload[0:4]))
+ partitionId := binary.LittleEndian.Uint32(payload[0:4])
currentOffset := binary.LittleEndian.Uint64(payload[4:12])
- messagesCount := int(binary.LittleEndian.Uint32(payload[12:16]))
+ messagesCount := binary.LittleEndian.Uint32(payload[12:16])
position := 16
- var messages = make([]MessageResponse, 0)
+ var messages = make([]IggyMessage, 0)
for position < length {
- offset := binary.LittleEndian.Uint64(payload[position :
position+8])
- state, err := mapMessageState(payload[position+8])
- timestamp := binary.LittleEndian.Uint64(payload[position+9 :
position+17])
- id, err := uuid.FromBytes(payload[position+17 : position+33])
- checksum := binary.LittleEndian.Uint32(payload[position+33 :
position+37])
- headersLength :=
int(binary.LittleEndian.Uint32(payload[position+37 : position+41]))
- headers, err := deserializeHeaders(payload[position+41 :
position+41+headersLength])
+ if position+MessageHeaderSize >= length {
+ // body needs to be at least 1 byte
+ break
+ }
+ header, err := MessageHeaderFromBytes(payload[position :
position+MessageHeaderSize])
if err != nil {
return nil, err
}
- position += headersLength
- messageLength := binary.LittleEndian.Uint32(payload[position+41
: position+45])
-
- payloadRangeStart := position + propertiesSize
- payloadRangeEnd := payloadRangeStart + int(messageLength)
- if payloadRangeStart > length || payloadRangeEnd > length {
+ position += MessageHeaderSize
+ payload_end := position + int(header.PayloadLength)
+ if int(payload_end) > length {
break
}
+ payloadSlice := payload[position:payload_end]
+ position = int(payload_end)
- payloadSlice := payload[payloadRangeStart:payloadRangeEnd]
- totalSize := propertiesSize + int(messageLength)
- position += totalSize
+ var user_headers []byte = nil
+ if header.UserHeaderLength > 0 {
+ user_headers = payload[position :
position+int(header.UserHeaderLength)]
+ }
+ position += int(header.UserHeaderLength)
switch compression {
- case iggcon.MESSAGE_COMPRESSION_S2,
iggcon.MESSAGE_COMPRESSION_S2_BETTER, iggcon.MESSAGE_COMPRESSION_S2_BEST:
- if messageLength < 32 {
+ case MESSAGE_COMPRESSION_S2, MESSAGE_COMPRESSION_S2_BETTER,
MESSAGE_COMPRESSION_S2_BEST:
+ if length < 32 {
break
}
payloadSlice, err = s2.Decode(nil, payloadSlice)
@@ -160,19 +156,11 @@ func DeserializeFetchMessagesResponse(payload []byte,
compression IggyMessageCom
}
}
- messages = append(messages, MessageResponse{
- Id: id,
- Payload: payloadSlice,
- Offset: offset,
- Timestamp: timestamp,
- Checksum: checksum,
- State: state,
- Headers: headers,
+ messages = append(messages, IggyMessage{
+ Header: *header,
+ Payload: payloadSlice,
+ UserHeaders: user_headers,
})
-
- if position+propertiesSize >= length {
- break
- }
}
// !TODO: Add message offset ordering
@@ -184,84 +172,6 @@ func DeserializeFetchMessagesResponse(payload []byte,
compression IggyMessageCom
}, nil
}
-func mapMessageState(state byte) (MessageState, error) {
- switch state {
- case 1:
- return MessageStateAvailable, nil
- case 10:
- return MessageStateUnavailable, nil
- case 20:
- return MessageStatePoisoned, nil
- case 30:
- return MessageStateMarkedForDeletion, nil
- default:
- return 0, errors.New("Invalid message state")
- }
-}
-
-func deserializeHeaders(payload []byte) (map[HeaderKey]HeaderValue, error) {
- headers := make(map[HeaderKey]HeaderValue)
- position := 0
-
- for position < len(payload) {
- if len(payload) <= position+4 {
- return nil, errors.New("Invalid header key length")
- }
-
- keyLength := binary.LittleEndian.Uint32(payload[position :
position+4])
- if keyLength == 0 || 255 < keyLength {
- return nil, errors.New("Key has incorrect size, must be
between 1 and 255")
- }
- position += 4
-
- if len(payload) < position+int(keyLength) {
- return nil, errors.New("Invalid header key")
- }
-
- key := string(payload[position : position+int(keyLength)])
- position += int(keyLength)
-
- headerKind, err := deserializeHeaderKind(payload, position)
- if err != nil {
- return nil, err
- }
- position++
-
- if len(payload) <= position+4 {
- return nil, errors.New("Invalid header value length")
- }
-
- valueLength := binary.LittleEndian.Uint32(payload[position :
position+4])
- position += 4
-
- if valueLength == 0 || 255 < valueLength {
- return nil, errors.New("Value has incorrect size, must
be between 1 and 255")
- }
-
- if len(payload) < position+int(valueLength) {
- return nil, errors.New("Invalid header value")
- }
-
- value := payload[position : position+int(valueLength)]
- position += int(valueLength)
-
- headers[HeaderKey{Value: key}] = HeaderValue{
- Kind: headerKind,
- Value: value,
- }
- }
-
- return headers, nil
-}
-
-func deserializeHeaderKind(payload []byte, position int) (HeaderKind, error) {
- if position >= len(payload) {
- return 0, errors.New("Invalid header kind position")
- }
-
- return HeaderKind(payload[position]), nil
-}
-
func DeserializeTopics(payload []byte) ([]TopicResponse, error) {
topics := make([]TopicResponse, 0)
length := len(payload)
@@ -383,7 +293,7 @@ func DeserializeToConsumerGroup(payload []byte, position
int) (*ConsumerGroupRes
func DeserializeUsers(payload []byte) ([]*UserResponse, error) {
if len(payload) == 0 {
- return nil, errors.New("Empty payload")
+ return nil, errors.New("empty payload")
}
var result []*UserResponse
diff --git a/foreign/go/binary_serialization/create_stream_serializer.go
b/foreign/go/binary_serialization/create_stream_serializer.go
index a73596b8..d32cdfbd 100644
--- a/foreign/go/binary_serialization/create_stream_serializer.go
+++ b/foreign/go/binary_serialization/create_stream_serializer.go
@@ -19,6 +19,7 @@ package binaryserialization
import (
"encoding/binary"
+
iggcon "github.com/iggy-rs/iggy-go-client/contracts"
)
diff --git
a/foreign/go/binary_serialization/fetch_messages_request_serializer.go
b/foreign/go/binary_serialization/fetch_messages_request_serializer.go
index 2aefbbb8..9630432e 100644
--- a/foreign/go/binary_serialization/fetch_messages_request_serializer.go
+++ b/foreign/go/binary_serialization/fetch_messages_request_serializer.go
@@ -19,6 +19,7 @@ package binaryserialization
import (
"encoding/binary"
+
iggcon "github.com/iggy-rs/iggy-go-client/contracts"
)
diff --git
a/foreign/go/binary_serialization/fetch_messages_request_serializer_test.go
b/foreign/go/binary_serialization/fetch_messages_request_serializer_test.go
index 3b5e2e9e..3d20f4b1 100644
--- a/foreign/go/binary_serialization/fetch_messages_request_serializer_test.go
+++ b/foreign/go/binary_serialization/fetch_messages_request_serializer_test.go
@@ -17,48 +17,59 @@
package binaryserialization
-//func TestSerialize_TcpFetchMessagesRequest(t *testing.T) {
-// // Create a sample TcpFetchMessagesRequest
-// request := TcpFetchMessagesRequest{
-// FetchMessagesRequest: iggcon.FetchMessagesRequest{
-// Consumer: iggcon.Consumer{
-// Kind: iggcon.ConsumerSingle,
-// Id: 42,
-// },
-// StreamId: iggcon.NewIdentifier("test_stream_id"),
-// TopicId: iggcon.NewIdentifier("test_topic_id"),
-// PartitionId: 123,
-// PollingStrategy: iggcon.FirstPollingStrategy(),
-// Count: 100,
-// AutoCommit: true,
-// },
-// }
-//
-// // Serialize the request
-// serialized := request.Serialize()
-//
-// // Expected serialized bytes based on the provided sample request
-// expected := []byte{
-// 0x01, // Consumer Kind
-// 0x2A, 0x00, 0x00, 0x00, // Consumer ID
-// 0x02,
// StreamId Kind (StringId)
-// 0x0E,
// StreamId Length (14)
-// 0x74, 0x65, 0x73, 0x74, 0x5F, 0x73, 0x74, 0x72, 0x65, 0x61,
0x6D, 0x5F, 0x69, 0x64, // StreamId
-// 0x02,
// TopicId Kind (StringId)
-// 0x0D,
// TopicId Length (13)
-// 0x74, 0x65, 0x73, 0x74, 0x5F, 0x74, 0x6F, 0x70, 0x69, 0x63,
0x5F, 0x69, 0x64, // TopicId
-// 0x7B, 0x00, 0x00, 0x00, // PartitionId (123)
-// 0x03, //
PollingStrategy Kind
-// 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, //
PollingStrategy Value (0)
-// 0x64, 0x00, 0x00, 0x00, // Count (100)
-// 0x01, // AutoCommit
-// }
-//
-// // Check if the serialized bytes match the expected bytes
-// if !areBytesEqual(serialized, expected) {
-// t.Errorf("Serialized bytes are incorrect.
\nExpected:\t%v\nGot:\t\t%v", expected, serialized)
-// }
-//}
+import (
+ "testing"
+
+ iggcon "github.com/iggy-rs/iggy-go-client/contracts"
+)
+
+func TestSerialize_TcpFetchMessagesRequest(t *testing.T) {
+ // Create a sample TcpFetchMessagesRequest
+ request := TcpFetchMessagesRequest{
+ FetchMessagesRequest: iggcon.FetchMessagesRequest{
+ Consumer: iggcon.Consumer{
+ Kind: iggcon.ConsumerSingle,
+ Id: iggcon.NewIdentifier(42),
+ },
+ StreamId: iggcon.NewIdentifier("test_stream_id"),
+ TopicId: iggcon.NewIdentifier("test_topic_id"),
+ PartitionId: 123,
+ PollingStrategy: iggcon.FirstPollingStrategy(),
+ Count: 100,
+ AutoCommit: true,
+ },
+ }
+
+ // Serialize the request
+ serialized := request.Serialize()
+
+ // Expected serialized bytes based on the provided sample request
+ expected := []byte{
+ 0x01, // Consumer Kind
+ 0x01, // ConsumerId Kind (NumericId)
+ 0x04, // ConsumerId Length (4)
+ 0x2A, 0x00, 0x0, 0x0, // ConsumerId
+
+ 0x02,
// StreamId Kind (StringId)
+ 0x0E,
// StreamId Length (14)
+ 0x74, 0x65, 0x73, 0x74, 0x5F, 0x73, 0x74, 0x72, 0x65, 0x61,
0x6D, 0x5F, 0x69, 0x64, // StreamId
+
+ 0x02,
// TopicId Kind (StringId)
+ 0x0D,
// TopicId Length (13)
+ 0x74, 0x65, 0x73, 0x74, 0x5F, 0x74, 0x6F, 0x70, 0x69, 0x63,
0x5F, 0x69, 0x64, // TopicId
+
+ 0x7B, 0x00, 0x00, 0x00, // PartitionId (123)
+ 0x03, //
PollingStrategy Kind
+ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, //
PollingStrategy Value (0)
+ 0x64, 0x00, 0x00, 0x00, // Count (100)
+ 0x01, // AutoCommit
+ }
+
+ // Check if the serialized bytes match the expected bytes
+ if !areBytesEqual(serialized, expected) {
+ t.Errorf("Serialized bytes are incorrect.
\nExpected:\t%v\nGot:\t\t%v", expected, serialized)
+ }
+}
func areBytesEqual(a, b []byte) bool {
if len(a) != len(b) {
diff --git
a/foreign/go/binary_serialization/send_messages_request_serializer.go
b/foreign/go/binary_serialization/send_messages_request_serializer.go
index 6288026b..f7a3a204 100644
--- a/foreign/go/binary_serialization/send_messages_request_serializer.go
+++ b/foreign/go/binary_serialization/send_messages_request_serializer.go
@@ -28,6 +28,8 @@ type TcpSendMessagesRequest struct {
iggcon.SendMessagesRequest
}
+const indexSize = 16
+
func (request *TcpSendMessagesRequest) Serialize(compression
iggcon.IggyMessageCompression) []byte {
for i, message := range request.Messages {
switch compression {
@@ -36,94 +38,91 @@ func (request *TcpSendMessagesRequest)
Serialize(compression iggcon.IggyMessageC
break
}
request.Messages[i].Payload = s2.Encode(nil,
message.Payload)
+ message.Header.PayloadLength =
uint32(len(message.Payload))
case iggcon.MESSAGE_COMPRESSION_S2_BETTER:
if len(message.Payload) < 32 {
break
}
request.Messages[i].Payload = s2.EncodeBetter(nil,
message.Payload)
+ message.Header.PayloadLength =
uint32(len(message.Payload))
case iggcon.MESSAGE_COMPRESSION_S2_BEST:
if len(message.Payload) < 32 {
break
}
request.Messages[i].Payload = s2.EncodeBest(nil,
message.Payload)
+ message.Header.PayloadLength =
uint32(len(message.Payload))
}
}
- streamTopicIdLength := 2 + request.StreamId.Length + 2 +
request.TopicId.Length
+ streamIdFieldSize := 2 + request.StreamId.Length
+ topicIdFieldSize := 2 + request.TopicId.Length
+ partitioningFieldSize := 2 + request.Partitioning.Length
+ metadataLenFieldSize := 4 // uint32
+ messageCount := len(request.Messages)
+ messagesCountFieldSize := 4 // uint32
+ metadataLen := streamIdFieldSize +
+ topicIdFieldSize +
+ partitioningFieldSize +
+ messagesCountFieldSize
+ indexesSize := messageCount * indexSize
messageBytesCount := calculateMessageBytesCount(request.Messages)
- totalSize := streamTopicIdLength + messageBytesCount +
request.Partitioning.Length + 2
+ totalSize := metadataLenFieldSize +
+ streamIdFieldSize +
+ topicIdFieldSize +
+ partitioningFieldSize +
+ messagesCountFieldSize +
+ indexesSize +
+ messageBytesCount
+
bytes := make([]byte, totalSize)
+
position := 0
+
+ //metadata
+ binary.LittleEndian.PutUint32(bytes[:4], uint32(metadataLen))
+ position = 4
//ids
- copy(bytes[position:2+request.StreamId.Length],
SerializeIdentifier(request.StreamId))
- copy(bytes[position+2+request.StreamId.Length:streamTopicIdLength],
SerializeIdentifier(request.TopicId))
- position = streamTopicIdLength
+ copy(bytes[position:position+streamIdFieldSize],
SerializeIdentifier(request.StreamId))
+
copy(bytes[position+streamIdFieldSize:position+streamIdFieldSize+topicIdFieldSize],
SerializeIdentifier(request.TopicId))
+ position += streamIdFieldSize + topicIdFieldSize
//partitioning
- bytes[streamTopicIdLength] = byte(request.Partitioning.Kind)
- bytes[streamTopicIdLength+1] = byte(request.Partitioning.Length)
-
copy(bytes[streamTopicIdLength+2:streamTopicIdLength+2+request.Partitioning.Length],
[]byte(request.Partitioning.Value))
- position = streamTopicIdLength + 2 + request.Partitioning.Length
-
- emptyHeaders := make([]byte, 4)
+ bytes[position] = byte(request.Partitioning.Kind)
+ bytes[position+1] = byte(request.Partitioning.Length)
+ copy(bytes[position+2:position+partitioningFieldSize],
[]byte(request.Partitioning.Value))
+ position += partitioningFieldSize
+ binary.LittleEndian.PutUint32(bytes[position:position+4],
uint32(messageCount))
+ position += 4
+
+ currentIndexPosition := position
+ for i := 0; i < indexesSize; i++ {
+ bytes[position+i] = 0
+ }
+ position += indexesSize
+ msgSize := uint32(0)
for _, message := range request.Messages {
- copy(bytes[position:position+16], message.Id[:])
- if message.Headers != nil {
- headersBytes := getHeadersBytes(message.Headers)
-
binary.LittleEndian.PutUint32(bytes[position+16:position+20],
uint32(len(headersBytes)))
- copy(bytes[position+20:position+20+len(headersBytes)],
headersBytes)
- position += len(headersBytes) + 20
- } else {
- copy(bytes[position+16:position+16+len(emptyHeaders)],
emptyHeaders)
- position += 20
- }
-
- binary.LittleEndian.PutUint32(bytes[position:position+4],
uint32(len(message.Payload)))
- copy(bytes[position+4:position+4+len(message.Payload)],
message.Payload)
- position += len(message.Payload) + 4
+ copy(bytes[position:position+iggcon.MessageHeaderSize],
message.Header.ToBytes())
+
copy(bytes[position+iggcon.MessageHeaderSize:position+iggcon.MessageHeaderSize+int(message.Header.PayloadLength)],
message.Payload)
+ position += iggcon.MessageHeaderSize +
int(message.Header.PayloadLength)
+
copy(bytes[position:position+int(message.Header.UserHeaderLength)],
message.UserHeaders)
+ position += int(message.Header.UserHeaderLength)
+
+ msgSize += iggcon.MessageHeaderSize +
message.Header.PayloadLength + message.Header.UserHeaderLength
+
+
binary.LittleEndian.PutUint32(bytes[currentIndexPosition:currentIndexPosition+4],
0)
+
binary.LittleEndian.PutUint32(bytes[currentIndexPosition+4:currentIndexPosition+8],
uint32(msgSize))
+
binary.LittleEndian.PutUint32(bytes[currentIndexPosition+8:currentIndexPosition+12],
0)
+ currentIndexPosition += indexSize
}
return bytes
}
-func calculateMessageBytesCount(messages []iggcon.Message) int {
+func calculateMessageBytesCount(messages []iggcon.IggyMessage) int {
count := 0
for _, msg := range messages {
- count += 16 + 4 + len(msg.Payload) + 4
- for key, header := range msg.Headers {
- count += 4 + len(key.Value) + 1 + 4 + len(header.Value)
- }
+ count += iggcon.MessageHeaderSize + len(msg.Payload) +
len(msg.UserHeaders)
}
return count
}
-
-func getHeadersBytes(headers map[iggcon.HeaderKey]iggcon.HeaderValue) []byte {
- headersLength := 0
- for key, header := range headers {
- headersLength += 4 + len(key.Value) + 1 + 4 + len(header.Value)
- }
- headersBytes := make([]byte, headersLength)
- position := 0
- for key, value := range headers {
- headerBytes := getBytesFromHeader(key, value)
- copy(headersBytes[position:position+len(headerBytes)],
headerBytes)
- position += len(headerBytes)
- }
- return headersBytes
-}
-
-func getBytesFromHeader(key iggcon.HeaderKey, value iggcon.HeaderValue) []byte
{
- headerBytesLength := 4 + len(key.Value) + 1 + 4 + len(value.Value)
- headerBytes := make([]byte, headerBytesLength)
-
- binary.LittleEndian.PutUint32(headerBytes[:4], uint32(len(key.Value)))
- copy(headerBytes[4:4+len(key.Value)], key.Value)
-
- headerBytes[4+len(key.Value)] = byte(value.Kind)
-
-
binary.LittleEndian.PutUint32(headerBytes[4+len(key.Value)+1:4+len(key.Value)+1+4],
uint32(len(value.Value)))
- copy(headerBytes[4+len(key.Value)+1+4:], value.Value)
-
- return headerBytes
-}
diff --git
a/foreign/go/binary_serialization/send_messages_request_serializer_test.go
b/foreign/go/binary_serialization/send_messages_request_serializer_test.go
new file mode 100644
index 00000000..f28ff19d
--- /dev/null
+++ b/foreign/go/binary_serialization/send_messages_request_serializer_test.go
@@ -0,0 +1,82 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package binaryserialization
+
+import (
+ "testing"
+
+ "github.com/google/uuid"
+ iggcon "github.com/iggy-rs/iggy-go-client/contracts"
+)
+
+func TestSerialize_SendMessagesRequest(t *testing.T) {
+ message1 := generateTestMessage("data1")
+ request := TcpSendMessagesRequest{
+ SendMessagesRequest: iggcon.SendMessagesRequest{
+ StreamId: iggcon.NewIdentifier("test_stream_id"),
+ TopicId: iggcon.NewIdentifier("test_topic_id"),
+ Partitioning: iggcon.PartitionId(1),
+ Messages: []iggcon.IggyMessage{
+ message1,
+ },
+ },
+ }
+
+ // Serialize the request
+ serialized := request.Serialize(iggcon.MESSAGE_COMPRESSION_NONE)
+
+ // Expected serialized bytes based on the provided sample request
+ expected := []byte{
+ 0x29, 0x0, 0x0, 0x0, // metadataLength
+ 0x02,
// StreamId Kind (StringId)
+ 0x0E,
// StreamId Length (14)
+ 0x74, 0x65, 0x73, 0x74, 0x5F, 0x73, 0x74, 0x72, 0x65, 0x61,
0x6D, 0x5F, 0x69, 0x64, // StreamId
+
+ 0x02,
// TopicId Kind (StringId)
+ 0x0D,
// TopicId Length (13)
+ 0x74, 0x65, 0x73, 0x74, 0x5F, 0x74, 0x6F, 0x70, 0x69, 0x63,
0x5F, 0x69, 0x64, // TopicId
+ 0x02, // PartitionIdKind
+ 0x04, // Partitioning Length
+ 0x01, 0x00, 0x00, 0x00, // PartitionId (123)
+ 0x01, 0x0, 0x0, 0x0, // MessageCount
+ 0, 0, 0, 0, 110, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, // Index
(16*1) bytes
+ }
+ expected = append(expected, message1.Header.ToBytes()...)
+ expected = append(expected, message1.Payload...)
+ expected = append(expected, message1.UserHeaders...)
+
+ // Check if the serialized bytes match the expected bytes
+ if !areBytesEqual(serialized, expected) {
+ t.Errorf("Serialized bytes are incorrect.
\nExpected:\t%v\nGot:\t\t%v", expected, serialized)
+ }
+}
+
+func createDefaultMessageHeaders() map[iggcon.HeaderKey]iggcon.HeaderValue {
+ return map[iggcon.HeaderKey]iggcon.HeaderValue{
+ {Value: "HeaderKey1"}: {Kind: iggcon.String, Value:
[]byte("Value 1")},
+ {Value: "HeaderKey2"}: {Kind: iggcon.Uint32, Value:
[]byte{0x01, 0x02, 0x03, 0x04}},
+ }
+}
+
+func generateTestMessage(payload string) iggcon.IggyMessage {
+ return iggcon.NewIggyMessageWithHeaders(
+ uuid.New(),
+ []byte(payload),
+ createDefaultMessageHeaders(),
+ )
+}
diff --git a/foreign/go/contracts/message_header.go
b/foreign/go/contracts/message_header.go
index d33642b2..c0c76df3 100644
--- a/foreign/go/contracts/message_header.go
+++ b/foreign/go/contracts/message_header.go
@@ -17,40 +17,70 @@
package iggcon
-import "errors"
+import (
+ "encoding/binary"
+ "errors"
+ "time"
-type HeaderValue struct {
- Kind HeaderKind
- Value []byte
+ "github.com/google/uuid"
+)
+
+const MessageHeaderSize = 8 + 16 + 8 + 8 + 8 + 4 + 4
+
+type MessageHeader struct {
+ Checksum uint64 `json:"checksum"`
+ Id uuid.UUID `json:"id"`
+ Offset uint64 `json:"offset"`
+ Timestamp uint64 `json:"timestamp"`
+ OriginTimestamp uint64 `json:"origin_timestamp"`
+ UserHeaderLength uint32 `json:"user_header_length"`
+ PayloadLength uint32 `json:"payload_length"`
}
-type HeaderKey struct {
- Value string
+func NewMessageHeader(id uuid.UUID, payloadLength uint32, userHeaderLength
uint32) MessageHeader {
+ return MessageHeader{
+ Id: id,
+ OriginTimestamp: uint64(time.Now().UnixMicro()),
+ PayloadLength: payloadLength,
+ UserHeaderLength: userHeaderLength,
+ }
}
-func NewHeaderKey(val string) (HeaderKey, error) {
- if len(val) == 0 || len(val) > 255 {
- return HeaderKey{}, errors.New("Value has incorrect size, must
be between 1 and 255")
+func MessageHeaderFromBytes(data []byte) (*MessageHeader, error) {
+
+ if len(data) != MessageHeaderSize {
+ return nil, errors.New("data has incorrect size, must be 56")
}
- return HeaderKey{Value: val}, nil
+ checksum := binary.LittleEndian.Uint64(data[0:8])
+ id, _ := uuid.FromBytes(data[8:24])
+ timestamp := binary.LittleEndian.Uint64(data[24:32])
+ origin_timestamp := binary.LittleEndian.Uint64(data[32:40])
+ offset := binary.LittleEndian.Uint64(data[40:48])
+ user_header_length := binary.LittleEndian.Uint32(data[48:52])
+ payload_length := binary.LittleEndian.Uint32(data[52:56])
+
+ return &MessageHeader{
+ Checksum: checksum,
+ Id: id,
+ Offset: offset,
+ Timestamp: timestamp,
+ OriginTimestamp: origin_timestamp,
+ UserHeaderLength: user_header_length,
+ PayloadLength: payload_length,
+ }, nil
}
-type HeaderKind int
-
-const (
- Raw HeaderKind = 1
- String HeaderKind = 2
- Bool HeaderKind = 3
- Int8 HeaderKind = 4
- Int16 HeaderKind = 5
- Int32 HeaderKind = 6
- Int64 HeaderKind = 7
- Int128 HeaderKind = 8
- Uint8 HeaderKind = 9
- Uint16 HeaderKind = 10
- Uint32 HeaderKind = 11
- Uint64 HeaderKind = 12
- Uint128 HeaderKind = 13
- Float HeaderKind = 14
- Double HeaderKind = 15
-)
+func (mh *MessageHeader) ToBytes() []byte {
+ bytes := make([]byte, 0, MessageHeaderSize)
+
+ bytes = binary.LittleEndian.AppendUint64(bytes, mh.Checksum)
+ idBytes, _ := uuid.UUID.MarshalBinary(mh.Id)
+ bytes = append(bytes, idBytes...)
+ bytes = binary.LittleEndian.AppendUint64(bytes, mh.Offset)
+ bytes = binary.LittleEndian.AppendUint64(bytes, mh.Timestamp)
+ bytes = binary.LittleEndian.AppendUint64(bytes, mh.OriginTimestamp)
+ bytes = binary.LittleEndian.AppendUint32(bytes, mh.UserHeaderLength)
+ bytes = binary.LittleEndian.AppendUint32(bytes, mh.PayloadLength)
+
+ return bytes
+}
diff --git a/foreign/go/contracts/message_polling.go
b/foreign/go/contracts/message_polling.go
index 67cd37aa..df9bdad9 100644
--- a/foreign/go/contracts/message_polling.go
+++ b/foreign/go/contracts/message_polling.go
@@ -22,7 +22,7 @@ type PollingStrategy struct {
Value uint64
}
-type MessagePolling int
+type MessagePolling byte
const (
POLLING_OFFSET MessagePolling = 1
diff --git a/foreign/go/contracts/messages.go b/foreign/go/contracts/messages.go
index fb1e9b3d..b4ebe657 100644
--- a/foreign/go/contracts/messages.go
+++ b/foreign/go/contracts/messages.go
@@ -32,48 +32,46 @@ type FetchMessagesRequest struct {
}
type FetchMessagesResponse struct {
- PartitionId int
+ PartitionId uint32
CurrentOffset uint64
- Messages []MessageResponse
- MessageCount int
+ MessageCount uint32
+ Messages []IggyMessage
}
-type MessageResponse struct {
- Offset uint64 `json:"offset"`
- Timestamp uint64 `json:"timestamp"`
- Checksum uint32 `json:"checksum"`
- Id uuid.UUID `json:"id"`
- Payload []byte `json:"payload"`
- Headers map[HeaderKey]HeaderValue `json:"headers,omitempty"`
- State MessageState `json:"state"`
+type SendMessagesRequest struct {
+ StreamId Identifier `json:"streamId"`
+ TopicId Identifier `json:"topicId"`
+ Partitioning Partitioning `json:"partitioning"`
+ Messages []IggyMessage `json:"messages"`
}
-type MessageState int
-
-const (
- MessageStateAvailable MessageState = iota
- MessageStateUnavailable
- MessageStatePoisoned
- MessageStateMarkedForDeletion
-)
-
-type SendMessagesRequest struct {
- StreamId Identifier `json:"streamId"`
- TopicId Identifier `json:"topicId"`
- Partitioning Partitioning `json:"partitioning"`
- Messages []Message `json:"messages"`
+type ReceivedMessage struct {
+ Message IggyMessage
+ CurrentOffset uint64
+ PartitionId uint32
}
-type Message struct {
- Id uuid.UUID
- Payload []byte
- Headers map[HeaderKey]HeaderValue
+type IggyMessage struct {
+ Header MessageHeader
+ Payload []byte
+ UserHeaders []byte
}
-func NewMessage(payload []byte, headers map[HeaderKey]HeaderValue) Message {
- return Message{
- Id: uuid.New(),
+func NewIggyMessage(id uuid.UUID, payload []byte) IggyMessage {
+ return IggyMessage{
+ Header: NewMessageHeader(id, uint32(len(payload)), 0),
Payload: payload,
- Headers: headers,
}
}
+
+func NewIggyMessageWithHeaders(id uuid.UUID, payload []byte, userHeaders
map[HeaderKey]HeaderValue) IggyMessage {
+ userHeaderBytes := GetHeadersBytes(userHeaders)
+ messageHeader := NewMessageHeader(id, uint32(len(payload)), 0)
+ messageHeader.UserHeaderLength = uint32(len(userHeaderBytes))
+ iggyMessage := IggyMessage{
+ Header: messageHeader,
+ Payload: payload,
+ UserHeaders: userHeaderBytes,
+ }
+ return iggyMessage
+}
diff --git a/foreign/go/contracts/user_headers.go
b/foreign/go/contracts/user_headers.go
new file mode 100644
index 00000000..3349c895
--- /dev/null
+++ b/foreign/go/contracts/user_headers.go
@@ -0,0 +1,152 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package iggcon
+
+import (
+ "encoding/binary"
+ "errors"
+)
+
+type HeaderValue struct {
+ Kind HeaderKind
+ Value []byte
+}
+
+type HeaderKey struct {
+ Value string
+}
+
+func NewHeaderKey(val string) (HeaderKey, error) {
+ if len(val) == 0 || len(val) > 255 {
+ return HeaderKey{}, errors.New("value has incorrect size, must
be between 1 and 255")
+ }
+ return HeaderKey{Value: val}, nil
+}
+
+type HeaderKind int
+
+const (
+ Raw HeaderKind = 1
+ String HeaderKind = 2
+ Bool HeaderKind = 3
+ Int8 HeaderKind = 4
+ Int16 HeaderKind = 5
+ Int32 HeaderKind = 6
+ Int64 HeaderKind = 7
+ Int128 HeaderKind = 8
+ Uint8 HeaderKind = 9
+ Uint16 HeaderKind = 10
+ Uint32 HeaderKind = 11
+ Uint64 HeaderKind = 12
+ Uint128 HeaderKind = 13
+ Float HeaderKind = 14
+ Double HeaderKind = 15
+)
+
+func GetHeadersBytes(headers map[HeaderKey]HeaderValue) []byte {
+ headersLength := 0
+ for key, header := range headers {
+ headersLength += 4 + len(key.Value) + 1 + 4 + len(header.Value)
+ }
+ headersBytes := make([]byte, headersLength)
+ position := 0
+ for key, value := range headers {
+ headerBytes := getBytesFromHeader(key, value)
+ copy(headersBytes[position:position+len(headerBytes)],
headerBytes)
+ position += len(headerBytes)
+ }
+ return headersBytes
+}
+
+func getBytesFromHeader(key HeaderKey, value HeaderValue) []byte {
+ headerBytesLength := 4 + len(key.Value) + 1 + 4 + len(value.Value)
+ headerBytes := make([]byte, headerBytesLength)
+
+ binary.LittleEndian.PutUint32(headerBytes[:4], uint32(len(key.Value)))
+ copy(headerBytes[4:4+len(key.Value)], key.Value)
+
+ headerBytes[4+len(key.Value)] = byte(value.Kind)
+
+
binary.LittleEndian.PutUint32(headerBytes[4+len(key.Value)+1:4+len(key.Value)+1+4],
uint32(len(value.Value)))
+ copy(headerBytes[4+len(key.Value)+1+4:], value.Value)
+
+ return headerBytes
+}
+
+func DeserializeHeaders(userHeadersBytes []byte) (map[HeaderKey]HeaderValue,
error) {
+ headers := make(map[HeaderKey]HeaderValue)
+ position := 0
+
+ for position < len(userHeadersBytes) {
+ if len(userHeadersBytes) <= position+4 {
+ return nil, errors.New("invalid header key length")
+ }
+
+ keyLength :=
binary.LittleEndian.Uint32(userHeadersBytes[position : position+4])
+ if keyLength == 0 || 255 < keyLength {
+ return nil, errors.New("key has incorrect size, must be
between 1 and 255")
+ }
+ position += 4
+
+ if len(userHeadersBytes) < position+int(keyLength) {
+ return nil, errors.New("invalid header key")
+ }
+
+ key := string(userHeadersBytes[position :
position+int(keyLength)])
+ position += int(keyLength)
+
+ headerKind, err := deserializeHeaderKind(userHeadersBytes,
position)
+ if err != nil {
+ return nil, err
+ }
+ position++
+
+ if len(userHeadersBytes) <= position+4 {
+ return nil, errors.New("invalid header value length")
+ }
+
+ valueLength :=
binary.LittleEndian.Uint32(userHeadersBytes[position : position+4])
+ position += 4
+
+ if valueLength == 0 || 255 < valueLength {
+ return nil, errors.New("value has incorrect size, must
be between 1 and 255")
+ }
+
+ if len(userHeadersBytes) < position+int(valueLength) {
+ return nil, errors.New("invalid header value")
+ }
+
+ value := userHeadersBytes[position : position+int(valueLength)]
+ position += int(valueLength)
+
+ headers[HeaderKey{Value: key}] = HeaderValue{
+ Kind: headerKind,
+ Value: value,
+ }
+ }
+
+ return headers, nil
+}
+
+func deserializeHeaderKind(payload []byte, position int) (HeaderKind, error) {
+ if position >= len(payload) {
+ return 0, errors.New("invalid header kind position")
+ }
+
+ return HeaderKind(payload[position]), nil
+}
diff --git a/foreign/go/e2e/tcp_test/messages_feature_send.go
b/foreign/go/e2e/tcp_test/messages_feature_send.go
index e8ed778f..41894f7f 100644
--- a/foreign/go/e2e/tcp_test/messages_feature_send.go
+++ b/foreign/go/e2e/tcp_test/messages_feature_send.go
@@ -95,7 +95,7 @@ var _ = Describe("SEND MESSAGES:", func() {
StreamId: iggcon.NewIdentifier(streamId),
TopicId: iggcon.NewIdentifier(topicId),
Partitioning:
iggcon.PartitionId(int(createRandomUInt32())),
- Messages: []iggcon.Message{},
+ Messages: []iggcon.IggyMessage{},
}
err := client.SendMessages(request)
itShouldReturnSpecificError(err,
"messages_count_should_be_greater_than_zero")
diff --git a/foreign/go/e2e/tcp_test/messages_steps.go
b/foreign/go/e2e/tcp_test/messages_steps.go
index f084ce50..56f2834d 100644
--- a/foreign/go/e2e/tcp_test/messages_steps.go
+++ b/foreign/go/e2e/tcp_test/messages_steps.go
@@ -21,6 +21,7 @@ import (
"bytes"
"reflect"
+ "github.com/google/uuid"
"github.com/iggy-rs/iggy-go-client"
iggcon "github.com/iggy-rs/iggy-go-client/contracts"
. "github.com/onsi/ginkgo/v2"
@@ -34,17 +35,17 @@ func createDefaultMessageHeaders()
map[iggcon.HeaderKey]iggcon.HeaderValue {
}
}
-func createDefaultMessages() []iggcon.Message {
+func createDefaultMessages() []iggcon.IggyMessage {
headers := createDefaultMessageHeaders()
- messages := []iggcon.Message{
- iggcon.NewMessage([]byte(createRandomString(256)), headers),
- iggcon.NewMessage([]byte(createRandomString(256)), headers),
+ messages := []iggcon.IggyMessage{
+ iggcon.NewIggyMessageWithHeaders(uuid.New(),
[]byte(createRandomString(256)), headers),
+ iggcon.NewIggyMessageWithHeaders(uuid.New(),
[]byte(createRandomString(256)), headers),
}
return messages
}
-func itShouldSuccessfullyPublishMessages(streamId int, topicId int, messages
[]iggcon.Message, client iggy.MessageStream) {
+func itShouldSuccessfullyPublishMessages(streamId int, topicId int, messages
[]iggcon.IggyMessage, client iggy.MessageStream) {
result, err := client.PollMessages(iggcon.FetchMessagesRequest{
StreamId: iggcon.NewIdentifier(streamId),
TopicId: iggcon.NewIdentifier(topicId),
@@ -77,10 +78,10 @@ func itShouldSuccessfullyPublishMessages(streamId int,
topicId int, messages []i
})
}
-func compareMessage(resultMessages []iggcon.MessageResponse, expectedMessage
iggcon.Message) bool {
+func compareMessage(resultMessages []iggcon.IggyMessage, expectedMessage
iggcon.IggyMessage) bool {
for _, msg := range resultMessages {
- if msg.Id == expectedMessage.Id && bytes.Equal(msg.Payload,
expectedMessage.Payload) {
- if reflect.DeepEqual(msg.Headers,
expectedMessage.Headers) {
+ if msg.Header.Id == expectedMessage.Header.Id &&
bytes.Equal(msg.Payload, expectedMessage.Payload) {
+ if reflect.DeepEqual(msg.UserHeaders,
expectedMessage.UserHeaders) {
return true
}
}
diff --git a/foreign/go/e2e/tcp_test/tcp_suite_test.go
b/foreign/go/e2e/tcp_test/tcp_suite_test.go
index 9a56578e..e2e78a6f 100644
--- a/foreign/go/e2e/tcp_test/tcp_suite_test.go
+++ b/foreign/go/e2e/tcp_test/tcp_suite_test.go
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-package tcp_test_test
+package tcp_test
import (
"testing"
diff --git a/foreign/go/e2e/tcp_test/test_helpers.go
b/foreign/go/e2e/tcp_test/test_helpers.go
index ce38fbc2..e61db65a 100644
--- a/foreign/go/e2e/tcp_test/test_helpers.go
+++ b/foreign/go/e2e/tcp_test/test_helpers.go
@@ -18,11 +18,12 @@
package tcp_test
import (
- . "github.com/iggy-rs/iggy-go-client"
- . "github.com/iggy-rs/iggy-go-client/contracts"
"math/rand"
"strings"
"time"
+
+ . "github.com/iggy-rs/iggy-go-client"
+ . "github.com/iggy-rs/iggy-go-client/contracts"
)
func createAuthorizedConnection() MessageStream {
diff --git a/foreign/go/errors/errors.go b/foreign/go/errors/errors.go
index 89710be8..0c45beaf 100644
--- a/foreign/go/errors/errors.go
+++ b/foreign/go/errors/errors.go
@@ -61,6 +61,8 @@ func TranslateErrorCode(code int) string {
return "invalid_format"
case 5:
return "feature_unavailable"
+ case 6:
+ return "invalid_identifier"
case 10:
return "cannot_create_base_directory"
case 20:
@@ -109,6 +111,8 @@ func TranslateErrorCode(code int) string {
return "cannot_parse_int"
case 204:
return "cannot_parse_slice"
+ case 206:
+ return "connection_closed"
case 300:
return "http_response_error"
case 301:
@@ -273,6 +277,10 @@ func TranslateErrorCode(code int) string {
return "invalid_message_checksum"
case 4028:
return "invalid_key_value_length"
+ case 4032:
+ return "non_zero_timestamp"
+ case 4036:
+ return "invalid_messages_size"
case 4100:
return "invalid_offset"
case 4101:
diff --git a/foreign/go/go.mod b/foreign/go/go.mod
index d45cf6ff..9df789a9 100644
--- a/foreign/go/go.mod
+++ b/foreign/go/go.mod
@@ -1,8 +1,8 @@
module github.com/iggy-rs/iggy-go-client
-go 1.22.0
+go 1.23.0
-toolchain go1.22.4
+toolchain go1.23.1
require (
github.com/google/uuid v1.6.0
@@ -17,9 +17,9 @@ require (
github.com/google/go-cmp v0.6.0 // indirect
github.com/google/pprof v0.0.0-20241210010833-40e02aabc2ad // indirect
github.com/kr/pretty v0.3.0 // indirect
- golang.org/x/net v0.33.0 // indirect
- golang.org/x/sys v0.28.0 // indirect
- golang.org/x/text v0.21.0 // indirect
+ golang.org/x/net v0.38.0 // indirect
+ golang.org/x/sys v0.31.0 // indirect
+ golang.org/x/text v0.23.0 // indirect
golang.org/x/tools v0.28.0 // indirect
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
diff --git a/foreign/go/go.sum b/foreign/go/go.sum
index 3980068d..cb576a09 100644
--- a/foreign/go/go.sum
+++ b/foreign/go/go.sum
@@ -30,12 +30,12 @@ github.com/rogpeppe/go-internal v1.6.1
h1:/FiVV8dS/e+YqF2JvO3yXRFbBLTIuSDkuC7aBO
github.com/rogpeppe/go-internal v1.6.1/go.mod
h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc=
github.com/stretchr/testify v1.8.4
h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod
h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
-golang.org/x/net v0.33.0 h1:74SYHlV8BIgHIFC/LrYkOGIwL19eTYXQ5wc6TBuO36I=
-golang.org/x/net v0.33.0/go.mod h1:HXLR5J+9DxmrqMwG9qjGCxZ+zKXxBru04zlTvWlWuN4=
-golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA=
-golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
-golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo=
-golang.org/x/text v0.21.0/go.mod
h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ=
+golang.org/x/net v0.38.0 h1:vRMAPTMaeGqVhG5QyLJHqNDwecKTomGeqbnfZyKlBI8=
+golang.org/x/net v0.38.0/go.mod h1:ivrbrMbzFq5J41QOQh0siUuly180yBYtLp+CKbEaFx8=
+golang.org/x/sys v0.31.0 h1:ioabZlmFYtWhL+TRYpcnNlLwhyxaM9kWTDEmfnprqik=
+golang.org/x/sys v0.31.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
+golang.org/x/text v0.23.0 h1:D71I7dUrlY+VX0gQShAThNGHFxZ13dGLBHQLVl1mJlY=
+golang.org/x/text v0.23.0/go.mod
h1:/BLNzu4aZCJ1+kcD0DNRotWKage4q2rGVAg4o22unh4=
golang.org/x/tools v0.28.0 h1:WuB6qZ4RPCQo5aP3WdKZS7i595EdWqWR8vqJTlwTVK8=
golang.org/x/tools v0.28.0/go.mod
h1:dcIOrVd3mfQKTgrDVQHqCPMWy6lnhfhtX3hLXYVLfRw=
google.golang.org/protobuf v1.36.1
h1:yBPeRvTftaleIgM3PZ/WBIZ7XM/eEYAaEyCwvyjq/gk=
diff --git a/foreign/go/samples/consumer/consumer.go
b/foreign/go/samples/consumer/consumer.go
index 2cc03af8..81373039 100644
--- a/foreign/go/samples/consumer/consumer.go
+++ b/foreign/go/samples/consumer/consumer.go
@@ -138,11 +138,11 @@ func ConsumeMessages(messageStream MessageStream) error {
}
}
-func HandleMessage(messageResponse MessageResponse) error {
- length := (len(messageResponse.Payload) * 3) / 4
+func HandleMessage(iggyMessage IggyMessage) error {
+ length := (len(iggyMessage.Payload) * 3) / 4
bytes := make([]byte, length)
- str := string(messageResponse.Payload)
+ str := string(iggyMessage.Payload)
isBase64 := false
if _, err := base64.StdEncoding.Decode(bytes, []byte(str)); err == nil {
@@ -162,12 +162,12 @@ func HandleMessage(messageResponse MessageResponse) error
{
return err
}
} else {
- if err := json.Unmarshal([]byte(messageResponse.Payload),
&envelope); err != nil {
+ if err := json.Unmarshal([]byte(iggyMessage.Payload),
&envelope); err != nil {
return err
}
}
- fmt.Printf("Handling message type: %s at offset: %d with message Id: %s
", envelope.MessageType, messageResponse.Offset, messageResponse.Id)
+ fmt.Printf("Handling message type: %s at offset: %d with message Id: %s
", envelope.MessageType, iggyMessage.Header.Offset, iggyMessage.Header.Id)
switch envelope.MessageType {
case "order_created":
diff --git a/foreign/go/samples/producer/producer.go
b/foreign/go/samples/producer/producer.go
index 6f0b2d74..103f33af 100644
--- a/foreign/go/samples/producer/producer.go
+++ b/foreign/go/samples/producer/producer.go
@@ -109,15 +109,20 @@ func PublishMessages(messageStream MessageStream) error {
for {
var debugMessages []sharedDemoContracts.ISerializableMessage
- var messages []Message
+ var messages []IggyMessage
for i := 0; i < MessageBatchCount; i++ {
message := messageGenerator.GenerateMessage()
json := message.ToBytes()
debugMessages = append(debugMessages, message)
- messages = append(messages, Message{
- Id: uuid.New(),
+ messages = append(messages, IggyMessage{
+ Header: MessageHeader{
+ Id: uuid.New(),
+ OriginTimestamp:
uint64(time.Now().UnixMicro()),
+ UserHeaderLength: 0,
+ PayloadLength: uint32(len(json)),
+ },
Payload: json,
})
}
@@ -129,6 +134,7 @@ func PublishMessages(messageStream MessageStream) error {
Partitioning: PartitionId(Partition),
})
if err != nil {
+ fmt.Printf("%s", err)
return nil
}
diff --git a/foreign/go/tcp/tcp_core.go b/foreign/go/tcp/tcp_core.go
index e625fac5..d31b00b0 100644
--- a/foreign/go/tcp/tcp_core.go
+++ b/foreign/go/tcp/tcp_core.go
@@ -135,6 +135,7 @@ func (tms *IggyTcpClient) sendAndFetchResponse(message
[]byte, command CommandCo
responseCode == 51 ||
responseCode == 5001 ||
responseCode == 5004 {
+ // do nothing
} else {
return nil, ierror.MapFromCode(responseCode)
}