This is an automated email from the ASF dual-hosted git repository.
piotr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new 9758c90c refactor(sdk): change int attribute to int32 and refactor
identifier. (#1995)
9758c90c is described below
commit 9758c90c2a1a587e3e7831142ccf9ed154c0a336
Author: Chengxi <[email protected]>
AuthorDate: Sat Jul 12 15:03:27 2025 -0400
refactor(sdk): change int attribute to int32 and refactor identifier.
(#1995)
1. refactor iggcon.Identifier, to enhance type safe.
2. change int => uint32 to align with rust sdk and avoid overflow.
---
bdd/go/tests/tcp_test/consumers_feature_create.go | 43 ++++++++-----
bdd/go/tests/tcp_test/consumers_feature_delete.go | 43 +++++++------
bdd/go/tests/tcp_test/consumers_feature_get_all.go | 6 +-
.../tests/tcp_test/consumers_feature_get_by_id.go | 31 ++++++----
bdd/go/tests/tcp_test/consumers_feature_join.go | 43 ++++++-------
bdd/go/tests/tcp_test/consumers_feature_leave.go | 41 ++++++------
bdd/go/tests/tcp_test/consumers_steps.go | 70 ++++++++++++---------
bdd/go/tests/tcp_test/messages_feature_send.go | 31 ++++++----
bdd/go/tests/tcp_test/messages_steps.go | 10 +--
bdd/go/tests/tcp_test/partitions_feature_create.go | 24 +++++---
bdd/go/tests/tcp_test/partitions_feature_delete.go | 22 ++++---
bdd/go/tests/tcp_test/partitions_steps.go | 8 ++-
bdd/go/tests/tcp_test/stream_feature_create.go | 12 ++--
bdd/go/tests/tcp_test/stream_feature_delete.go | 8 +--
bdd/go/tests/tcp_test/stream_feature_get_by_id.go | 6 +-
bdd/go/tests/tcp_test/stream_feature_update.go | 15 ++---
bdd/go/tests/tcp_test/stream_steps.go | 35 ++++++-----
bdd/go/tests/tcp_test/test_helpers.go | 14 ++++-
bdd/go/tests/tcp_test/topic_feature_create.go | 31 ++++++----
bdd/go/tests/tcp_test/topic_feature_delete.go | 15 +++--
bdd/go/tests/tcp_test/topic_feature_get_all.go | 5 +-
bdd/go/tests/tcp_test/topic_feature_get_by_id.go | 10 +--
bdd/go/tests/tcp_test/topic_feature_update.go | 34 +++++-----
bdd/go/tests/tcp_test/topic_steps.go | 36 ++++++-----
bdd/go/tests/tcp_test/users_feature_create.go | 6 +-
bdd/go/tests/tcp_test/users_feature_delete.go | 8 +--
bdd/go/tests/tcp_test/users_feature_get_all.go | 4 +-
bdd/go/tests/tcp_test/users_feature_get_by_id.go | 5 +-
bdd/go/tests/tcp_test/users_feature_password.go | 7 ++-
bdd/go/tests/tcp_test/users_feature_permissions.go | 7 ++-
bdd/go/tests/tcp_test/users_feature_update.go | 7 ++-
bdd/go/tests/tcp_test/users_steps.go | 24 +++++---
.../go/benchmarks/send_messages_benchmark_test.go | 28 +++++----
.../binary_request_serializer.go | 4 +-
.../binary_response_deserializer.go | 34 +++++-----
.../create_topic_serializer.go | 4 +-
.../fetch_messages_request_serializer_test.go | 9 ++-
.../binary_serialization/identifier_serializer.go | 25 ++------
.../identifier_serializer_test.go | 21 +++----
.../send_messages_request_serializer_test.go | 6 +-
.../go/binary_serialization/stats_serializer.go | 14 ++---
.../update_stream_serializer_test.go | 3 +-
.../update_topic_serializer_test.go | 6 +-
foreign/go/contracts/consumer_groups.go | 18 +++---
foreign/go/contracts/identifier.go | 72 +++++++++++++++++-----
foreign/go/contracts/messages.go | 2 +-
foreign/go/contracts/offets.go | 2 +-
foreign/go/contracts/partitions.go | 4 +-
foreign/go/contracts/stats.go | 14 ++---
foreign/go/contracts/stream.go | 4 +-
foreign/go/contracts/topics.go | 8 +--
foreign/go/errors/constants.go | 4 ++
foreign/go/iggycli/client.go | 4 +-
foreign/go/samples/consumer/consumer.go | 28 ++++++---
foreign/go/samples/producer/producer.go | 18 +++---
foreign/go/tcp/tcp_clients_managament.go | 4 +-
foreign/go/tcp/tcp_topic_managament.go | 2 +-
57 files changed, 572 insertions(+), 427 deletions(-)
diff --git a/bdd/go/tests/tcp_test/consumers_feature_create.go
b/bdd/go/tests/tcp_test/consumers_feature_create.go
index a2bdfb28..91162bc3 100644
--- a/bdd/go/tests/tcp_test/consumers_feature_create.go
+++ b/bdd/go/tests/tcp_test/consumers_feature_create.go
@@ -30,25 +30,27 @@ var _ = ginkgo.Describe("CREATE CONSUMER GROUP:", func() {
streamId, _ := successfullyCreateStream(prefix, client)
defer deleteStreamAfterTests(streamId, client)
topicId, _ := successfullyCreateTopic(streamId, client)
+ streamIdentifier, _ := iggcon.NewIdentifier(streamId)
+ topicIdentifier, _ := iggcon.NewIdentifier(topicId)
groupId := createRandomUInt32()
name := createRandomString(16)
_, err := client.CreateConsumerGroup(
- iggcon.NewIdentifier(streamId),
- iggcon.NewIdentifier(topicId),
+ streamIdentifier,
+ topicIdentifier,
name,
&groupId,
)
itShouldNotReturnError(err)
- itShouldSuccessfullyCreateConsumer(streamId, topicId,
int(groupId), name, client)
+ itShouldSuccessfullyCreateConsumer(streamId, topicId,
groupId, name, client)
})
ginkgo.Context("and tries to create consumer group for a non
existing stream", func() {
client := createAuthorizedConnection()
groupId := createRandomUInt32()
_, err := client.CreateConsumerGroup(
- iggcon.NewIdentifier(int(createRandomUInt32())),
- iggcon.NewIdentifier(int(createRandomUInt32())),
+ randomU32Identifier(),
+ randomU32Identifier(),
createRandomString(16),
&groupId)
@@ -60,9 +62,10 @@ var _ = ginkgo.Describe("CREATE CONSUMER GROUP:", func() {
streamId, _ := successfullyCreateStream(prefix, client)
defer deleteStreamAfterTests(streamId, client)
groupId := createRandomUInt32()
+ streamIdentifier, _ := iggcon.NewIdentifier(streamId)
_, err := client.CreateConsumerGroup(
- iggcon.NewIdentifier(streamId),
- iggcon.NewIdentifier(int(createRandomUInt32())),
+ streamIdentifier,
+ randomU32Identifier(),
createRandomString(16),
&groupId,
)
@@ -77,10 +80,12 @@ var _ = ginkgo.Describe("CREATE CONSUMER GROUP:", func() {
topicId, _ := successfullyCreateTopic(streamId, client)
_, name := successfullyCreateConsumer(streamId,
topicId, client)
+ streamIdentifier, _ := iggcon.NewIdentifier(streamId)
+ topicIdentifier, _ := iggcon.NewIdentifier(topicId)
groupId := createRandomUInt32()
_, err := client.CreateConsumerGroup(
- iggcon.NewIdentifier(streamId),
- iggcon.NewIdentifier(topicId),
+ streamIdentifier,
+ topicIdentifier,
name,
&groupId,
)
@@ -95,12 +100,13 @@ var _ = ginkgo.Describe("CREATE CONSUMER GROUP:", func() {
topicId, _ := successfullyCreateTopic(streamId, client)
groupId, _ := successfullyCreateConsumer(streamId,
topicId, client)
- uint32GroupId := uint32(groupId)
+ streamIdentifier, _ := iggcon.NewIdentifier(streamId)
+ topicIdentifier, _ := iggcon.NewIdentifier(topicId)
_, err := client.CreateConsumerGroup(
- iggcon.NewIdentifier(streamId),
- iggcon.NewIdentifier(topicId),
+ streamIdentifier,
+ topicIdentifier,
createRandomString(16),
- &uint32GroupId)
+ &groupId)
itShouldReturnSpecificError(err,
"consumer_group_already_exists")
})
@@ -112,8 +118,11 @@ var _ = ginkgo.Describe("CREATE CONSUMER GROUP:", func() {
topicId, _ := successfullyCreateTopic(streamId, client)
groupId := createRandomUInt32()
- _, err :=
client.CreateConsumerGroup(iggcon.NewIdentifier(streamId),
- iggcon.NewIdentifier(topicId),
+ streamIdentifier, _ := iggcon.NewIdentifier(streamId)
+ topicIdentifier, _ := iggcon.NewIdentifier(topicId)
+ _, err := client.CreateConsumerGroup(
+ streamIdentifier,
+ topicIdentifier,
createRandomString(256),
&groupId)
@@ -126,8 +135,8 @@ var _ = ginkgo.Describe("CREATE CONSUMER GROUP:", func() {
client := createClient()
groupId := createRandomUInt32()
_, err := client.CreateConsumerGroup(
- iggcon.NewIdentifier(int(createRandomUInt32())),
- iggcon.NewIdentifier(int(createRandomUInt32())),
+ randomU32Identifier(),
+ randomU32Identifier(),
createRandomString(16),
&groupId,
)
diff --git a/bdd/go/tests/tcp_test/consumers_feature_delete.go
b/bdd/go/tests/tcp_test/consumers_feature_delete.go
index 3b2e0516..00aa8033 100644
--- a/bdd/go/tests/tcp_test/consumers_feature_delete.go
+++ b/bdd/go/tests/tcp_test/consumers_feature_delete.go
@@ -32,10 +32,14 @@ var _ = ginkgo.Describe("DELETE CONSUMER GROUP:", func() {
defer deleteStreamAfterTests(streamId, client)
topicId, _ := successfullyCreateTopic(streamId, client)
groupId, _ := successfullyCreateConsumer(streamId,
topicId, client)
+
+ streamIdentifier, _ := iggcon.NewIdentifier(streamId)
+ topicIdentifier, _ := iggcon.NewIdentifier(topicId)
+ groupIdentifier, _ := iggcon.NewIdentifier(groupId)
err := client.DeleteConsumerGroup(
- iggcon.NewIdentifier(streamId),
- iggcon.NewIdentifier(topicId),
- iggcon.NewIdentifier(groupId),
+ streamIdentifier,
+ topicIdentifier,
+ groupIdentifier,
)
itShouldNotReturnError(err)
@@ -47,11 +51,13 @@ var _ = ginkgo.Describe("DELETE CONSUMER GROUP:", func() {
streamId, _ := successfullyCreateStream(prefix, client)
defer deleteStreamAfterTests(streamId, client)
topicId, _ := successfullyCreateTopic(streamId, client)
- groupId := int(createRandomUInt32())
+
+ streamIdentifier, _ := iggcon.NewIdentifier(streamId)
+ topicIdentifier, _ := iggcon.NewIdentifier(topicId)
err := client.DeleteConsumerGroup(
- iggcon.NewIdentifier(streamId),
- iggcon.NewIdentifier(topicId),
- iggcon.NewIdentifier(groupId),
+ streamIdentifier,
+ topicIdentifier,
+ randomU32Identifier(),
)
itShouldReturnSpecificIggyError(err,
ierror.ConsumerGroupIdNotFound)
@@ -61,12 +67,12 @@ var _ = ginkgo.Describe("DELETE CONSUMER GROUP:", func() {
client := createAuthorizedConnection()
streamId, _ := successfullyCreateStream(prefix, client)
defer deleteStreamAfterTests(streamId, client)
- topicId := int(createRandomUInt32())
+ streamIdentifier, _ := iggcon.NewIdentifier(streamId)
err := client.DeleteConsumerGroup(
- iggcon.NewIdentifier(streamId),
- iggcon.NewIdentifier(topicId),
- iggcon.NewIdentifier(int(createRandomUInt32())),
+ streamIdentifier,
+ randomU32Identifier(),
+ randomU32Identifier(),
)
itShouldReturnSpecificError(err, "topic_id_not_found")
@@ -74,13 +80,10 @@ var _ = ginkgo.Describe("DELETE CONSUMER GROUP:", func() {
ginkgo.Context("and tries to delete consumer for non-existing
topic and stream", func() {
client := createAuthorizedConnection()
- streamId := int(createRandomUInt32())
- topicId := int(createRandomUInt32())
-
err := client.DeleteConsumerGroup(
- iggcon.NewIdentifier(streamId),
- iggcon.NewIdentifier(topicId),
- iggcon.NewIdentifier(int(createRandomUInt32())),
+ randomU32Identifier(),
+ randomU32Identifier(),
+ randomU32Identifier(),
)
itShouldReturnSpecificError(err, "stream_id_not_found")
@@ -91,9 +94,9 @@ var _ = ginkgo.Describe("DELETE CONSUMER GROUP:", func() {
ginkgo.Context("and tries to delete consumer group", func() {
client := createClient()
err := client.DeleteConsumerGroup(
- iggcon.NewIdentifier(int(createRandomUInt32())),
- iggcon.NewIdentifier(int(createRandomUInt32())),
- iggcon.NewIdentifier(int(createRandomUInt32())),
+ randomU32Identifier(),
+ randomU32Identifier(),
+ randomU32Identifier(),
)
itShouldReturnUnauthenticatedError(err)
diff --git a/bdd/go/tests/tcp_test/consumers_feature_get_all.go
b/bdd/go/tests/tcp_test/consumers_feature_get_all.go
index ad4d12e1..d4546fce 100644
--- a/bdd/go/tests/tcp_test/consumers_feature_get_all.go
+++ b/bdd/go/tests/tcp_test/consumers_feature_get_all.go
@@ -31,7 +31,9 @@ var _ = ginkgo.Describe("GET ALL CONSUMER GROUPS:", func() {
defer deleteStreamAfterTests(streamId, client)
topicId, _ := successfullyCreateTopic(streamId, client)
groupId, name := successfullyCreateConsumer(streamId,
topicId, client)
- groups, err :=
client.GetConsumerGroups(iggcon.NewIdentifier(streamId),
iggcon.NewIdentifier(topicId))
+ streamIdentifier, _ := iggcon.NewIdentifier(streamId)
+ topicIdentifier, _ := iggcon.NewIdentifier(topicId)
+ groups, err :=
client.GetConsumerGroups(streamIdentifier, topicIdentifier)
itShouldNotReturnError(err)
itShouldContainSpecificConsumer(groupId, name, groups)
@@ -41,7 +43,7 @@ var _ = ginkgo.Describe("GET ALL CONSUMER GROUPS:", func() {
ginkgo.When("User is not logged in", func() {
ginkgo.Context("and tries to get all consumer groups", func() {
client := createClient()
- _, err :=
client.GetConsumerGroups(iggcon.NewIdentifier(int(createRandomUInt32())),
iggcon.NewIdentifier(int(createRandomUInt32())))
+ _, err :=
client.GetConsumerGroups(randomU32Identifier(), randomU32Identifier())
itShouldReturnUnauthenticatedError(err)
})
diff --git a/bdd/go/tests/tcp_test/consumers_feature_get_by_id.go
b/bdd/go/tests/tcp_test/consumers_feature_get_by_id.go
index 81ecbaa7..480dcc6f 100644
--- a/bdd/go/tests/tcp_test/consumers_feature_get_by_id.go
+++ b/bdd/go/tests/tcp_test/consumers_feature_get_by_id.go
@@ -32,7 +32,10 @@ var _ = ginkgo.Describe("GET CONSUMER GROUP BY ID:", func() {
defer deleteStreamAfterTests(streamId, client)
topicId, _ := successfullyCreateTopic(streamId, client)
groupId, name := successfullyCreateConsumer(streamId,
topicId, client)
- group, err :=
client.GetConsumerGroup(iggcon.NewIdentifier(streamId),
iggcon.NewIdentifier(topicId), iggcon.NewIdentifier(groupId))
+ streamIdentifier, _ := iggcon.NewIdentifier(streamId)
+ topicIdentifier, _ := iggcon.NewIdentifier(topicId)
+ groupIdentifier, _ := iggcon.NewIdentifier(groupId)
+ group, err := client.GetConsumerGroup(streamIdentifier,
topicIdentifier, groupIdentifier)
itShouldNotReturnError(err)
itShouldReturnSpecificConsumer(groupId, name,
&group.ConsumerGroup)
@@ -42,9 +45,10 @@ var _ = ginkgo.Describe("GET CONSUMER GROUP BY ID:", func() {
client := createAuthorizedConnection()
_, err := client.GetConsumerGroup(
- iggcon.NewIdentifier(int(createRandomUInt32())),
- iggcon.NewIdentifier(int(createRandomUInt32())),
- iggcon.NewIdentifier(int(createRandomUInt32())))
+ randomU32Identifier(),
+ randomU32Identifier(),
+ randomU32Identifier(),
+ )
itShouldReturnSpecificIggyError(err,
ierror.ConsumerGroupIdNotFound)
})
@@ -53,11 +57,12 @@ var _ = ginkgo.Describe("GET CONSUMER GROUP BY ID:", func()
{
client := createAuthorizedConnection()
streamId, _ := successfullyCreateStream(prefix, client)
defer deleteStreamAfterTests(streamId, client)
-
+ streamIdentifier, _ := iggcon.NewIdentifier(streamId)
_, err := client.GetConsumerGroup(
- iggcon.NewIdentifier(streamId),
- iggcon.NewIdentifier(int(createRandomUInt32())),
- iggcon.NewIdentifier(int(createRandomUInt32())))
+ streamIdentifier,
+ randomU32Identifier(),
+ randomU32Identifier(),
+ )
itShouldReturnSpecificIggyError(err,
ierror.ConsumerGroupIdNotFound)
})
@@ -67,11 +72,13 @@ var _ = ginkgo.Describe("GET CONSUMER GROUP BY ID:", func()
{
streamId, _ := successfullyCreateStream(prefix, client)
defer deleteStreamAfterTests(streamId, client)
topicId, _ := successfullyCreateTopic(streamId, client)
-
+ streamIdentifier, _ := iggcon.NewIdentifier(streamId)
+ topicIdentifier, _ := iggcon.NewIdentifier(topicId)
_, err := client.GetConsumerGroup(
- iggcon.NewIdentifier(streamId),
- iggcon.NewIdentifier(topicId),
- iggcon.NewIdentifier(int(createRandomUInt32())))
+ streamIdentifier,
+ topicIdentifier,
+ randomU32Identifier(),
+ )
itShouldReturnSpecificIggyError(err,
ierror.ConsumerGroupIdNotFound)
})
diff --git a/bdd/go/tests/tcp_test/consumers_feature_join.go
b/bdd/go/tests/tcp_test/consumers_feature_join.go
index 5990573f..51e5654b 100644
--- a/bdd/go/tests/tcp_test/consumers_feature_join.go
+++ b/bdd/go/tests/tcp_test/consumers_feature_join.go
@@ -31,10 +31,13 @@ var _ = ginkgo.Describe("JOIN CONSUMER GROUP:", func() {
defer deleteStreamAfterTests(streamId, client)
topicId, _ := successfullyCreateTopic(streamId, client)
groupId, _ := successfullyCreateConsumer(streamId,
topicId, client)
+ streamIdentifier, _ := iggcon.NewIdentifier(streamId)
+ topicIdentifier, _ := iggcon.NewIdentifier(topicId)
+ groupIdentifier, _ := iggcon.NewIdentifier(groupId)
err := client.JoinConsumerGroup(
- iggcon.NewIdentifier(streamId),
- iggcon.NewIdentifier(topicId),
- iggcon.NewIdentifier(groupId),
+ streamIdentifier,
+ topicIdentifier,
+ groupIdentifier,
)
itShouldNotReturnError(err)
@@ -46,10 +49,12 @@ var _ = ginkgo.Describe("JOIN CONSUMER GROUP:", func() {
streamId, _ := successfullyCreateStream(prefix, client)
defer deleteStreamAfterTests(streamId, client)
topicId, _ := successfullyCreateTopic(streamId, client)
- groupId := int(createRandomUInt32())
- err :=
client.JoinConsumerGroup(iggcon.NewIdentifier(streamId),
- iggcon.NewIdentifier(topicId),
- iggcon.NewIdentifier(groupId),
+ streamIdentifier, _ := iggcon.NewIdentifier(streamId)
+ topicIdentifier, _ := iggcon.NewIdentifier(topicId)
+ err := client.JoinConsumerGroup(
+ streamIdentifier,
+ topicIdentifier,
+ randomU32Identifier(),
)
itShouldReturnSpecificError(err,
"consumer_group_not_found")
@@ -59,12 +64,11 @@ var _ = ginkgo.Describe("JOIN CONSUMER GROUP:", func() {
client := createAuthorizedConnection()
streamId, _ := successfullyCreateStream(prefix, client)
defer deleteStreamAfterTests(streamId, client)
- topicId := int(createRandomUInt32())
-
+ streamIdentifier, _ := iggcon.NewIdentifier(streamId)
err := client.JoinConsumerGroup(
- iggcon.NewIdentifier(streamId),
- iggcon.NewIdentifier(topicId),
- iggcon.NewIdentifier(int(createRandomUInt32())),
+ streamIdentifier,
+ randomU32Identifier(),
+ randomU32Identifier(),
)
itShouldReturnSpecificError(err, "topic_id_not_found")
@@ -72,13 +76,10 @@ var _ = ginkgo.Describe("JOIN CONSUMER GROUP:", func() {
ginkgo.Context("and tries to join consumer for non-existing
topic and stream", func() {
client := createAuthorizedConnection()
- streamId := int(createRandomUInt32())
- topicId := int(createRandomUInt32())
-
err := client.JoinConsumerGroup(
- iggcon.NewIdentifier(streamId),
- iggcon.NewIdentifier(topicId),
- iggcon.NewIdentifier(int(createRandomUInt32())),
+ randomU32Identifier(),
+ randomU32Identifier(),
+ randomU32Identifier(),
)
itShouldReturnSpecificError(err, "stream_id_not_found")
@@ -89,9 +90,9 @@ var _ = ginkgo.Describe("JOIN CONSUMER GROUP:", func() {
ginkgo.Context("and tries to join to the consumer group",
func() {
client := createClient()
err := client.JoinConsumerGroup(
- iggcon.NewIdentifier(int(createRandomUInt32())),
- iggcon.NewIdentifier(int(createRandomUInt32())),
- iggcon.NewIdentifier(int(createRandomUInt32())),
+ randomU32Identifier(),
+ randomU32Identifier(),
+ randomU32Identifier(),
)
itShouldReturnUnauthenticatedError(err)
diff --git a/bdd/go/tests/tcp_test/consumers_feature_leave.go
b/bdd/go/tests/tcp_test/consumers_feature_leave.go
index 3cdee835..79e2dfca 100644
--- a/bdd/go/tests/tcp_test/consumers_feature_leave.go
+++ b/bdd/go/tests/tcp_test/consumers_feature_leave.go
@@ -33,10 +33,13 @@ var _ = ginkgo.Describe("LEAVE CONSUMER GROUP:", func() {
groupId, _ := successfullyCreateConsumer(streamId,
topicId, client)
successfullyJoinConsumer(streamId, topicId, groupId,
client)
+ streamIdentifier, _ := iggcon.NewIdentifier(streamId)
+ topicIdentifier, _ := iggcon.NewIdentifier(topicId)
+ groupIdentifier, _ := iggcon.NewIdentifier(groupId)
err := client.LeaveConsumerGroup(
- iggcon.NewIdentifier(streamId),
- iggcon.NewIdentifier(topicId),
- iggcon.NewIdentifier(groupId),
+ streamIdentifier,
+ topicIdentifier,
+ groupIdentifier,
)
itShouldNotReturnError(err)
@@ -48,11 +51,12 @@ var _ = ginkgo.Describe("LEAVE CONSUMER GROUP:", func() {
streamId, _ := successfullyCreateStream(prefix, client)
defer deleteStreamAfterTests(streamId, client)
topicId, _ := successfullyCreateTopic(streamId, client)
- groupId := int(createRandomUInt32())
+ streamIdentifier, _ := iggcon.NewIdentifier(streamId)
+ topicIdentifier, _ := iggcon.NewIdentifier(topicId)
err := client.LeaveConsumerGroup(
- iggcon.NewIdentifier(streamId),
- iggcon.NewIdentifier(topicId),
- iggcon.NewIdentifier(groupId),
+ streamIdentifier,
+ topicIdentifier,
+ randomU32Identifier(),
)
itShouldReturnSpecificError(err,
"consumer_group_not_found")
@@ -62,12 +66,11 @@ var _ = ginkgo.Describe("LEAVE CONSUMER GROUP:", func() {
client := createAuthorizedConnection()
streamId, _ := successfullyCreateStream(prefix, client)
defer deleteStreamAfterTests(streamId, client)
- topicId := int(createRandomUInt32())
-
+ streamIdentifier, _ := iggcon.NewIdentifier(streamId)
err := client.LeaveConsumerGroup(
- iggcon.NewIdentifier(streamId),
- iggcon.NewIdentifier(topicId),
- iggcon.NewIdentifier(int(createRandomUInt32())),
+ streamIdentifier,
+ randomU32Identifier(),
+ randomU32Identifier(),
)
itShouldReturnSpecificError(err, "topic_id_not_found")
@@ -75,13 +78,11 @@ var _ = ginkgo.Describe("LEAVE CONSUMER GROUP:", func() {
ginkgo.Context("and tries to leave consumer for non-existing
topic and stream", func() {
client := createAuthorizedConnection()
- streamId := int(createRandomUInt32())
- topicId := int(createRandomUInt32())
err := client.LeaveConsumerGroup(
- iggcon.NewIdentifier(streamId),
- iggcon.NewIdentifier(topicId),
- iggcon.NewIdentifier(int(createRandomUInt32())),
+ randomU32Identifier(),
+ randomU32Identifier(),
+ randomU32Identifier(),
)
itShouldReturnSpecificError(err, "stream_id_not_found")
@@ -92,9 +93,9 @@ var _ = ginkgo.Describe("LEAVE CONSUMER GROUP:", func() {
ginkgo.Context("and tries to leave to the consumer group",
func() {
client := createClient()
err := client.LeaveConsumerGroup(
- iggcon.NewIdentifier(int(createRandomUInt32())),
- iggcon.NewIdentifier(int(createRandomUInt32())),
- iggcon.NewIdentifier(int(createRandomUInt32())),
+ randomU32Identifier(),
+ randomU32Identifier(),
+ randomU32Identifier(),
)
itShouldReturnUnauthenticatedError(err)
diff --git a/bdd/go/tests/tcp_test/consumers_steps.go
b/bdd/go/tests/tcp_test/consumers_steps.go
index 1e644067..d14b909c 100644
--- a/bdd/go/tests/tcp_test/consumers_steps.go
+++ b/bdd/go/tests/tcp_test/consumers_steps.go
@@ -18,8 +18,7 @@
package tcp_test
import (
- "strconv"
-
+ "fmt"
iggcon "github.com/apache/iggy/foreign/go/contracts"
"github.com/apache/iggy/foreign/go/iggycli"
"github.com/onsi/ginkgo/v2"
@@ -27,26 +26,30 @@ import (
)
// operations
-func successfullyCreateConsumer(streamId int, topicId int, cli iggycli.Client)
(int, string) {
+func successfullyCreateConsumer(streamId uint32, topicId uint32, cli
iggycli.Client) (uint32, string) {
groupId := createRandomUInt32()
name := createRandomString(16)
- _, err := cli.CreateConsumerGroup(iggcon.NewIdentifier(streamId),
- iggcon.NewIdentifier(topicId),
+ streamIdentifier, _ := iggcon.NewIdentifier(streamId)
+ topicIdentifier, _ := iggcon.NewIdentifier(topicId)
+ _, err := cli.CreateConsumerGroup(
+ streamIdentifier,
+ topicIdentifier,
name,
- &groupId,
- )
+ &groupId)
- itShouldSuccessfullyCreateConsumer(streamId, topicId, int(groupId),
name, cli)
+ itShouldSuccessfullyCreateConsumer(streamId, topicId, groupId, name,
cli)
itShouldNotReturnError(err)
- return int(groupId), name
+ return groupId, name
}
-func successfullyJoinConsumer(streamId int, topicId int, groupId int, client
iggycli.Client) {
-
+func successfullyJoinConsumer(streamId uint32, topicId uint32, groupId uint32,
client iggycli.Client) {
+ streamIdentifier, _ := iggcon.NewIdentifier(streamId)
+ topicIdentifier, _ := iggcon.NewIdentifier(topicId)
+ groupIdentifier, _ := iggcon.NewIdentifier(groupId)
err := client.JoinConsumerGroup(
- iggcon.NewIdentifier(streamId),
- iggcon.NewIdentifier(topicId),
- iggcon.NewIdentifier(groupId),
+ streamIdentifier,
+ topicIdentifier,
+ groupIdentifier,
)
itShouldSuccessfullyJoinConsumer(streamId, topicId, groupId, client)
@@ -55,7 +58,7 @@ func successfullyJoinConsumer(streamId int, topicId int,
groupId int, client igg
//assertions
-func itShouldReturnSpecificConsumer(id int, name string, consumer
*iggcon.ConsumerGroup) {
+func itShouldReturnSpecificConsumer(id uint32, name string, consumer
*iggcon.ConsumerGroup) {
ginkgo.It("should fetch consumer with id "+string(rune(id)), func() {
gomega.Expect(consumer).NotTo(gomega.BeNil())
gomega.Expect(consumer.Id).To(gomega.Equal(id))
@@ -67,7 +70,7 @@ func itShouldReturnSpecificConsumer(id int, name string,
consumer *iggcon.Consum
})
}
-func itShouldContainSpecificConsumer(id int, name string, consumers
[]iggcon.ConsumerGroup) {
+func itShouldContainSpecificConsumer(id uint32, name string, consumers
[]iggcon.ConsumerGroup) {
ginkgo.It("should fetch at least one consumer", func() {
gomega.Expect(len(consumers)).NotTo(gomega.Equal(0))
})
@@ -83,7 +86,7 @@ func itShouldContainSpecificConsumer(id int, name string,
consumers []iggcon.Con
}
}
- ginkgo.It("should fetch consumer with id "+strconv.Itoa(id), func() {
+ ginkgo.It(fmt.Sprintf("should fetch consumer with id %d", id), func() {
gomega.Expect(found).To(gomega.BeTrue(), "Consumer with id %d
and name %s not found", id, name)
gomega.Expect(consumer.Id).To(gomega.Equal(id))
})
@@ -94,9 +97,11 @@ func itShouldContainSpecificConsumer(id int, name string,
consumers []iggcon.Con
})
}
-func itShouldSuccessfullyCreateConsumer(streamId int, topicId int, groupId
int, expectedName string, client iggycli.Client) {
- consumer, err :=
client.GetConsumerGroup(iggcon.NewIdentifier(streamId),
iggcon.NewIdentifier(topicId), iggcon.NewIdentifier(groupId))
-
+func itShouldSuccessfullyCreateConsumer(streamId uint32, topicId uint32,
groupId uint32, expectedName string, client iggycli.Client) {
+ streamIdentifier, _ := iggcon.NewIdentifier(streamId)
+ topicIdentifier, _ := iggcon.NewIdentifier(topicId)
+ groupIdentifier, _ := iggcon.NewIdentifier(groupId)
+ consumer, err := client.GetConsumerGroup(streamIdentifier,
topicIdentifier, groupIdentifier)
ginkgo.It("should create consumer with id "+string(rune(groupId)),
func() {
gomega.Expect(consumer).NotTo(gomega.BeNil())
gomega.Expect(consumer.Id).To(gomega.Equal(groupId))
@@ -109,17 +114,22 @@ func itShouldSuccessfullyCreateConsumer(streamId int,
topicId int, groupId int,
itShouldNotReturnError(err)
}
-func itShouldSuccessfullyDeletedConsumer(streamId int, topicId int, groupId
int, client iggycli.Client) {
- consumer, err :=
client.GetConsumerGroup(iggcon.NewIdentifier(streamId),
iggcon.NewIdentifier(topicId), iggcon.NewIdentifier(groupId))
-
+func itShouldSuccessfullyDeletedConsumer(streamId uint32, topicId uint32,
groupId uint32, client iggycli.Client) {
+ streamIdentifier, _ := iggcon.NewIdentifier(streamId)
+ topicIdentifier, _ := iggcon.NewIdentifier(topicId)
+ groupIdentifier, _ := iggcon.NewIdentifier(groupId)
+ consumer, err := client.GetConsumerGroup(streamIdentifier,
topicIdentifier, groupIdentifier)
itShouldReturnSpecificError(err, "consumer_group_not_found")
ginkgo.It("should not return consumer", func() {
gomega.Expect(consumer).To(gomega.BeNil())
})
}
-func itShouldSuccessfullyJoinConsumer(streamId int, topicId int, groupId int,
client iggycli.Client) {
- consumer, err :=
client.GetConsumerGroup(iggcon.NewIdentifier(streamId),
iggcon.NewIdentifier(topicId), iggcon.NewIdentifier(groupId))
+func itShouldSuccessfullyJoinConsumer(streamId uint32, topicId uint32, groupId
uint32, client iggycli.Client) {
+ streamIdentifier, _ := iggcon.NewIdentifier(streamId)
+ topicIdentifier, _ := iggcon.NewIdentifier(topicId)
+ groupIdentifier, _ := iggcon.NewIdentifier(groupId)
+ consumer, err := client.GetConsumerGroup(streamIdentifier,
topicIdentifier, groupIdentifier)
ginkgo.It("should join consumer with id "+string(rune(groupId)), func()
{
gomega.Expect(consumer).NotTo(gomega.BeNil())
@@ -129,12 +139,14 @@ func itShouldSuccessfullyJoinConsumer(streamId int,
topicId int, groupId int, cl
itShouldNotReturnError(err)
}
-func itShouldSuccessfullyLeaveConsumer(streamId int, topicId int, groupId int,
client iggycli.Client) {
- consumer, err :=
client.GetConsumerGroup(iggcon.NewIdentifier(streamId),
iggcon.NewIdentifier(topicId), iggcon.NewIdentifier(groupId))
-
+func itShouldSuccessfullyLeaveConsumer(streamId uint32, topicId uint32,
groupId uint32, client iggycli.Client) {
+ streamIdentifier, _ := iggcon.NewIdentifier(streamId)
+ topicIdentifier, _ := iggcon.NewIdentifier(topicId)
+ groupIdentifier, _ := iggcon.NewIdentifier(groupId)
+ consumer, err := client.GetConsumerGroup(streamIdentifier,
topicIdentifier, groupIdentifier)
ginkgo.It("should leave consumer with id "+string(rune(groupId)),
func() {
gomega.Expect(consumer).NotTo(gomega.BeNil())
- gomega.Expect(consumer.MembersCount).To(gomega.Equal(0))
+ gomega.Expect(consumer.MembersCount).To(gomega.Equal(uint32(0)))
})
itShouldNotReturnError(err)
diff --git a/bdd/go/tests/tcp_test/messages_feature_send.go
b/bdd/go/tests/tcp_test/messages_feature_send.go
index 0fcf0d7b..00dd5835 100644
--- a/bdd/go/tests/tcp_test/messages_feature_send.go
+++ b/bdd/go/tests/tcp_test/messages_feature_send.go
@@ -31,9 +31,11 @@ var _ = ginkgo.Describe("SEND MESSAGES:", func() {
defer deleteStreamAfterTests(streamId, client)
topicId, _ := successfullyCreateTopic(streamId, client)
messages := createDefaultMessages()
+ streamIdentifier, _ := iggcon.NewIdentifier(streamId)
+ topicIdentifier, _ := iggcon.NewIdentifier(topicId)
err := client.SendMessages(
- iggcon.NewIdentifier(streamId),
- iggcon.NewIdentifier(topicId),
+ streamIdentifier,
+ topicIdentifier,
iggcon.None(),
messages,
)
@@ -46,9 +48,10 @@ var _ = ginkgo.Describe("SEND MESSAGES:", func() {
streamId, _ := successfullyCreateStream("2"+prefix,
client)
defer deleteStreamAfterTests(streamId, client)
messages := createDefaultMessages()
+ streamIdentifier, _ := iggcon.NewIdentifier(streamId)
err := client.SendMessages(
- iggcon.NewIdentifier(streamId),
- iggcon.NewIdentifier(int(createRandomUInt32())),
+ streamIdentifier,
+ randomU32Identifier(),
iggcon.None(),
messages,
)
@@ -59,8 +62,8 @@ var _ = ginkgo.Describe("SEND MESSAGES:", func() {
client := createAuthorizedConnection()
messages := createDefaultMessages()
err := client.SendMessages(
- iggcon.NewIdentifier(int(createRandomUInt32())),
- iggcon.NewIdentifier(int(createRandomUInt32())),
+ randomU32Identifier(),
+ randomU32Identifier(),
iggcon.None(),
messages,
)
@@ -73,9 +76,11 @@ var _ = ginkgo.Describe("SEND MESSAGES:", func() {
defer deleteStreamAfterTests(streamId, client)
topicId, _ := successfullyCreateTopic(streamId, client)
messages := createDefaultMessages()
+ streamIdentifier, _ := iggcon.NewIdentifier(streamId)
+ topicIdentifier, _ := iggcon.NewIdentifier(topicId)
err := client.SendMessages(
- iggcon.NewIdentifier(streamId),
- iggcon.NewIdentifier(topicId),
+ streamIdentifier,
+ topicIdentifier,
iggcon.PartitionId(int(createRandomUInt32())),
messages,
)
@@ -87,9 +92,11 @@ var _ = ginkgo.Describe("SEND MESSAGES:", func() {
streamId, _ := successfullyCreateStream(prefix, client)
defer deleteStreamAfterTests(streamId,
createAuthorizedConnection())
topicId, _ := successfullyCreateTopic(streamId, client)
+ streamIdentifier, _ := iggcon.NewIdentifier(streamId)
+ topicIdentifier, _ := iggcon.NewIdentifier(topicId)
err := client.SendMessages(
- iggcon.NewIdentifier(streamId),
- iggcon.NewIdentifier(topicId),
+ streamIdentifier,
+ topicIdentifier,
iggcon.PartitionId(int(createRandomUInt32())),
[]iggcon.IggyMessage{},
)
@@ -102,8 +109,8 @@ var _ = ginkgo.Describe("SEND MESSAGES:", func() {
client := createClient()
messages := createDefaultMessages()
err := client.SendMessages(
- iggcon.NewIdentifier(int(createRandomUInt32())),
- iggcon.NewIdentifier(int(createRandomUInt32())),
+ randomU32Identifier(),
+ randomU32Identifier(),
iggcon.None(),
messages,
)
diff --git a/bdd/go/tests/tcp_test/messages_steps.go
b/bdd/go/tests/tcp_test/messages_steps.go
index b0ba4316..b169c0c8 100644
--- a/bdd/go/tests/tcp_test/messages_steps.go
+++ b/bdd/go/tests/tcp_test/messages_steps.go
@@ -42,13 +42,15 @@ func createDefaultMessages() []iggcon.IggyMessage {
return []iggcon.IggyMessage{msg1, msg2}
}
-func itShouldSuccessfullyPublishMessages(streamId int, topicId int, messages
[]iggcon.IggyMessage, client iggycli.Client) {
+func itShouldSuccessfullyPublishMessages(streamId uint32, topicId uint32,
messages []iggcon.IggyMessage, client iggycli.Client) {
+ streamIdentifier, _ := iggcon.NewIdentifier(streamId)
+ topicIdentifier, _ := iggcon.NewIdentifier(topicId)
result, err := client.PollMessages(
- iggcon.NewIdentifier(streamId),
- iggcon.NewIdentifier(topicId),
+ streamIdentifier,
+ topicIdentifier,
iggcon.Consumer{
Kind: iggcon.ConsumerKindSingle,
- Id: iggcon.NewIdentifier(int(createRandomUInt32())),
+ Id: randomU32Identifier(),
},
iggcon.FirstPollingStrategy(),
uint32(len(messages)),
diff --git a/bdd/go/tests/tcp_test/partitions_feature_create.go
b/bdd/go/tests/tcp_test/partitions_feature_create.go
index 89529fa3..0ddfe0fe 100644
--- a/bdd/go/tests/tcp_test/partitions_feature_create.go
+++ b/bdd/go/tests/tcp_test/partitions_feature_create.go
@@ -30,11 +30,14 @@ var _ = ginkgo.Describe("CREATE PARTITION:", func() {
streamId, _ := successfullyCreateStream(prefix, client)
defer deleteStreamAfterTests(streamId, client)
topicId, _ := successfullyCreateTopic(streamId, client)
- partitionsCount := 10
+ streamIdentifier, _ := iggcon.NewIdentifier(streamId)
+ topicIdentifier, _ := iggcon.NewIdentifier(topicId)
+ partitionsCount := uint32(10)
err := client.CreatePartitions(
- iggcon.NewIdentifier(streamId),
- iggcon.NewIdentifier(topicId),
- uint32(partitionsCount))
+ streamIdentifier,
+ topicIdentifier,
+ partitionsCount,
+ )
itShouldNotReturnError(err)
itShouldHaveExpectedNumberOfPartitions(streamId,
topicId, partitionsCount+2, client)
@@ -43,8 +46,8 @@ var _ = ginkgo.Describe("CREATE PARTITION:", func() {
ginkgo.Context("and tries to create partitions for a non
existing stream", func() {
client := createAuthorizedConnection()
err := client.CreatePartitions(
- iggcon.NewIdentifier(int(createRandomUInt32())),
- iggcon.NewIdentifier(int(createRandomUInt32())),
+ randomU32Identifier(),
+ randomU32Identifier(),
10,
)
@@ -55,9 +58,10 @@ var _ = ginkgo.Describe("CREATE PARTITION:", func() {
client := createAuthorizedConnection()
streamId, _ := successfullyCreateStream(prefix, client)
defer deleteStreamAfterTests(streamId, client)
+ streamIdentifier, _ := iggcon.NewIdentifier(streamId)
err := client.CreatePartitions(
- iggcon.NewIdentifier(streamId),
- iggcon.NewIdentifier(int(createRandomUInt32())),
+ streamIdentifier,
+ randomU32Identifier(),
10,
)
@@ -69,8 +73,8 @@ var _ = ginkgo.Describe("CREATE PARTITION:", func() {
ginkgo.Context("and tries to create partitions", func() {
client := createClient()
err := client.CreatePartitions(
- iggcon.NewIdentifier(int(createRandomUInt32())),
- iggcon.NewIdentifier(int(createRandomUInt32())),
+ randomU32Identifier(),
+ randomU32Identifier(),
10,
)
diff --git a/bdd/go/tests/tcp_test/partitions_feature_delete.go
b/bdd/go/tests/tcp_test/partitions_feature_delete.go
index dd51f3d6..14c575f0 100644
--- a/bdd/go/tests/tcp_test/partitions_feature_delete.go
+++ b/bdd/go/tests/tcp_test/partitions_feature_delete.go
@@ -30,11 +30,12 @@ var _ = ginkgo.Describe("DELETE PARTITION:", func() {
streamId, _ := successfullyCreateStream(prefix, client)
defer deleteStreamAfterTests(streamId, client)
topicId, _ := successfullyCreateTopic(streamId, client)
-
- partitionsCount := 1
+ streamIdentifier, _ := iggcon.NewIdentifier(streamId)
+ topicIdentifier, _ := iggcon.NewIdentifier(topicId)
+ partitionsCount := uint32(1)
err := client.DeletePartitions(
- iggcon.NewIdentifier(streamId),
- iggcon.NewIdentifier(topicId),
+ streamIdentifier,
+ topicIdentifier,
1,
)
@@ -45,8 +46,8 @@ var _ = ginkgo.Describe("DELETE PARTITION:", func() {
ginkgo.Context("and tries to delete partitions for a non
existing stream", func() {
client := createAuthorizedConnection()
err := client.DeletePartitions(
- iggcon.NewIdentifier(int(createRandomUInt32())),
- iggcon.NewIdentifier(int(createRandomUInt32())),
+ randomU32Identifier(),
+ randomU32Identifier(),
10,
)
@@ -57,9 +58,10 @@ var _ = ginkgo.Describe("DELETE PARTITION:", func() {
client := createAuthorizedConnection()
streamId, _ := successfullyCreateStream(prefix, client)
defer deleteStreamAfterTests(streamId, client)
+ streamIdentifier, _ := iggcon.NewIdentifier(streamId)
err := client.DeletePartitions(
- iggcon.NewIdentifier(streamId),
- iggcon.NewIdentifier(int(createRandomUInt32())),
+ streamIdentifier,
+ randomU32Identifier(),
10,
)
@@ -71,8 +73,8 @@ var _ = ginkgo.Describe("DELETE PARTITION:", func() {
ginkgo.Context("and tries to delete partitions", func() {
client := createClient()
err := client.DeletePartitions(
- iggcon.NewIdentifier(int(createRandomUInt32())),
- iggcon.NewIdentifier(int(createRandomUInt32())),
+ randomU32Identifier(),
+ randomU32Identifier(),
10,
)
diff --git a/bdd/go/tests/tcp_test/partitions_steps.go
b/bdd/go/tests/tcp_test/partitions_steps.go
index ee9327d6..78e5f188 100644
--- a/bdd/go/tests/tcp_test/partitions_steps.go
+++ b/bdd/go/tests/tcp_test/partitions_steps.go
@@ -24,13 +24,15 @@ import (
"github.com/onsi/gomega"
)
-func itShouldHaveExpectedNumberOfPartitions(streamId int, topicId int,
expectedPartitions int, client iggycli.Client) {
- topic, err := client.GetTopic(iggcon.NewIdentifier(streamId),
iggcon.NewIdentifier(topicId))
+func itShouldHaveExpectedNumberOfPartitions(streamId uint32, topicId uint32,
expectedPartitions uint32, client iggycli.Client) {
+ streamIdentifier, _ := iggcon.NewIdentifier(streamId)
+ topicIdentifier, _ := iggcon.NewIdentifier(topicId)
+ topic, err := client.GetTopic(streamIdentifier, topicIdentifier)
ginkgo.It("should have "+string(rune(expectedPartitions))+"
partitions", func() {
gomega.Expect(topic).NotTo(gomega.BeNil())
gomega.Expect(topic.PartitionsCount).To(gomega.Equal(expectedPartitions))
-
gomega.Expect(len(topic.Partitions)).To(gomega.Equal(expectedPartitions))
+
gomega.Expect(len(topic.Partitions)).To(gomega.Equal(int(expectedPartitions)))
})
itShouldNotReturnError(err)
diff --git a/bdd/go/tests/tcp_test/stream_feature_create.go
b/bdd/go/tests/tcp_test/stream_feature_create.go
index 44116f76..60f18be3 100644
--- a/bdd/go/tests/tcp_test/stream_feature_create.go
+++ b/bdd/go/tests/tcp_test/stream_feature_create.go
@@ -29,10 +29,10 @@ var _ = ginkgo.Describe("CREATE STREAM:", func() {
name := createRandomString(32)
_, err := client.CreateStream(name, &streamId)
- defer deleteStreamAfterTests(int(streamId), client)
+ defer deleteStreamAfterTests(streamId, client)
itShouldNotReturnError(err)
- itShouldSuccessfullyCreateStream(int(streamId), name,
client)
+ itShouldSuccessfullyCreateStream(streamId, name, client)
})
ginkgo.Context("and tries to create stream with duplicate
stream name", func() {
@@ -41,10 +41,10 @@ var _ = ginkgo.Describe("CREATE STREAM:", func() {
name := createRandomString(32)
_, err := client.CreateStream(name, &streamId)
- defer deleteStreamAfterTests(int(streamId), client)
+ defer deleteStreamAfterTests(streamId, client)
itShouldNotReturnError(err)
- itShouldSuccessfullyCreateStream(int(streamId), name,
client)
+ itShouldSuccessfullyCreateStream(streamId, name, client)
anotherStreamId := createRandomUInt32()
_, err = client.CreateStream(name, &anotherStreamId)
@@ -58,10 +58,10 @@ var _ = ginkgo.Describe("CREATE STREAM:", func() {
name := createRandomString(32)
_, err := client.CreateStream(name, &streamId)
- defer deleteStreamAfterTests(int(streamId), client)
+ defer deleteStreamAfterTests(streamId, client)
itShouldNotReturnError(err)
- itShouldSuccessfullyCreateStream(int(streamId), name,
client)
+ itShouldSuccessfullyCreateStream(streamId, name, client)
_, err = client.CreateStream(createRandomString(32),
&streamId)
diff --git a/bdd/go/tests/tcp_test/stream_feature_delete.go
b/bdd/go/tests/tcp_test/stream_feature_delete.go
index ddda1c6a..a7468def 100644
--- a/bdd/go/tests/tcp_test/stream_feature_delete.go
+++ b/bdd/go/tests/tcp_test/stream_feature_delete.go
@@ -28,7 +28,8 @@ var _ = ginkgo.Describe("DELETE STREAM:", func() {
ginkgo.Context("and tries to delete existing stream", func() {
client := createAuthorizedConnection()
streamId, _ := successfullyCreateStream(prefix, client)
- err :=
client.DeleteStream(iggcon.NewIdentifier(streamId))
+ streamIdentifier, _ := iggcon.NewIdentifier(streamId)
+ err := client.DeleteStream(streamIdentifier)
itShouldNotReturnError(err)
itShouldSuccessfullyDeleteStream(streamId, client)
@@ -36,9 +37,8 @@ var _ = ginkgo.Describe("DELETE STREAM:", func() {
ginkgo.Context("and tries to delete non-existing stream",
func() {
client := createAuthorizedConnection()
- streamId := int(createRandomUInt32())
- err :=
client.DeleteStream(iggcon.NewIdentifier(streamId))
+ err := client.DeleteStream(randomU32Identifier())
itShouldReturnSpecificError(err, "stream_id_not_found")
})
@@ -47,7 +47,7 @@ var _ = ginkgo.Describe("DELETE STREAM:", func() {
ginkgo.When("User is not logged in", func() {
ginkgo.Context("and tries to delete stream", func() {
client := createClient()
- err :=
client.DeleteStream(iggcon.NewIdentifier(int(createRandomUInt32())))
+ err := client.DeleteStream(randomU32Identifier())
itShouldReturnUnauthenticatedError(err)
})
diff --git a/bdd/go/tests/tcp_test/stream_feature_get_by_id.go
b/bdd/go/tests/tcp_test/stream_feature_get_by_id.go
index 302e60d6..d978922b 100644
--- a/bdd/go/tests/tcp_test/stream_feature_get_by_id.go
+++ b/bdd/go/tests/tcp_test/stream_feature_get_by_id.go
@@ -29,7 +29,8 @@ var _ = ginkgo.Describe("GET STREAM BY ID:", func() {
client := createAuthorizedConnection()
streamId, name := successfullyCreateStream(prefix,
client)
defer deleteStreamAfterTests(streamId, client)
- stream, err :=
client.GetStream(iggcon.NewIdentifier(streamId))
+ streamIdentifier, _ := iggcon.NewIdentifier(streamId)
+ stream, err := client.GetStream(streamIdentifier)
itShouldNotReturnError(err)
itShouldReturnSpecificStream(streamId, name, *stream)
@@ -37,9 +38,8 @@ var _ = ginkgo.Describe("GET STREAM BY ID:", func() {
ginkgo.Context("and tries to get non-existing stream", func() {
client := createAuthorizedConnection()
- streamId := int(createRandomUInt32())
- _, err :=
client.GetStream(iggcon.NewIdentifier(streamId))
+ _, err := client.GetStream(randomU32Identifier())
itShouldReturnSpecificError(err, "stream_id_not_found")
})
diff --git a/bdd/go/tests/tcp_test/stream_feature_update.go
b/bdd/go/tests/tcp_test/stream_feature_update.go
index da87ccb8..2cb97334 100644
--- a/bdd/go/tests/tcp_test/stream_feature_update.go
+++ b/bdd/go/tests/tcp_test/stream_feature_update.go
@@ -30,8 +30,8 @@ var _ = ginkgo.Describe("UPDATE STREAM:", func() {
streamId, _ := successfullyCreateStream(prefix, client)
defer deleteStreamAfterTests(streamId, client)
newName := createRandomString(128)
-
- err :=
client.UpdateStream(iggcon.NewIdentifier(streamId), newName)
+ streamIdentifier, _ := iggcon.NewIdentifier(streamId)
+ err := client.UpdateStream(streamIdentifier, newName)
itShouldNotReturnError(err)
itShouldSuccessfullyUpdateStream(streamId, newName,
client)
})
@@ -43,14 +43,15 @@ var _ = ginkgo.Describe("UPDATE STREAM:", func() {
defer deleteStreamAfterTests(stream1Id, client)
defer deleteStreamAfterTests(stream2Id, client)
- err :=
client.UpdateStream(iggcon.NewIdentifier(stream2Id), stream1Name)
+ stream2Identifier, _ := iggcon.NewIdentifier(stream2Id)
+ err := client.UpdateStream(stream2Identifier,
stream1Name)
itShouldReturnSpecificError(err,
"stream_name_already_exists")
})
ginkgo.Context("and tries to update non-existing stream",
func() {
client := createAuthorizedConnection()
- err :=
client.UpdateStream(iggcon.NewIdentifier(int(createRandomUInt32())),
createRandomString(128))
+ err := client.UpdateStream(randomU32Identifier(),
createRandomString(128))
itShouldReturnSpecificError(err, "stream_id_not_found")
})
@@ -59,8 +60,8 @@ var _ = ginkgo.Describe("UPDATE STREAM:", func() {
client := createAuthorizedConnection()
streamId, _ := successfullyCreateStream(prefix, client)
defer deleteStreamAfterTests(streamId,
createAuthorizedConnection())
-
- err :=
client.UpdateStream(iggcon.NewIdentifier(streamId), createRandomString(256))
+ streamIdentifier, _ := iggcon.NewIdentifier(streamId)
+ err := client.UpdateStream(streamIdentifier,
createRandomString(256))
itShouldReturnSpecificError(err, "stream_name_too_long")
})
@@ -69,7 +70,7 @@ var _ = ginkgo.Describe("UPDATE STREAM:", func() {
ginkgo.When("User is not logged in", func() {
ginkgo.Context("and tries to update stream", func() {
client := createClient()
- err :=
client.UpdateStream(iggcon.NewIdentifier(int(createRandomUInt32())),
createRandomString(128))
+ err := client.UpdateStream(randomU32Identifier(),
createRandomString(128))
itShouldReturnUnauthenticatedError(err)
})
diff --git a/bdd/go/tests/tcp_test/stream_steps.go
b/bdd/go/tests/tcp_test/stream_steps.go
index 3eea199f..0581a5d0 100644
--- a/bdd/go/tests/tcp_test/stream_steps.go
+++ b/bdd/go/tests/tcp_test/stream_steps.go
@@ -18,8 +18,7 @@
package tcp_test
import (
- "strconv"
-
+ "fmt"
iggcon "github.com/apache/iggy/foreign/go/contracts"
ierror "github.com/apache/iggy/foreign/go/errors"
"github.com/apache/iggy/foreign/go/iggycli"
@@ -29,20 +28,20 @@ import (
//operations
-func successfullyCreateStream(prefix string, client iggycli.Client) (int,
string) {
+func successfullyCreateStream(prefix string, client iggycli.Client) (uint32,
string) {
streamId := createRandomUInt32()
name := createRandomStringWithPrefix(prefix, 128)
_, err := client.CreateStream(name, &streamId)
itShouldNotReturnError(err)
- itShouldSuccessfullyCreateStream(int(streamId), name, client)
- return int(streamId), name
+ itShouldSuccessfullyCreateStream(streamId, name, client)
+ return streamId, name
}
//assertions
-func itShouldReturnSpecificStream(id int, name string, stream
iggcon.StreamDetails) {
+func itShouldReturnSpecificStream(id uint32, name string, stream
iggcon.StreamDetails) {
ginkgo.It("should fetch stream with id "+string(rune(id)), func() {
gomega.Expect(stream.Id).To(gomega.Equal(id))
})
@@ -52,7 +51,7 @@ func itShouldReturnSpecificStream(id int, name string, stream
iggcon.StreamDetai
})
}
-func itShouldContainSpecificStream(id int, name string, streams
[]iggcon.Stream) {
+func itShouldContainSpecificStream(id uint32, name string, streams
[]iggcon.Stream) {
ginkgo.It("should fetch at least one stream", func() {
gomega.Expect(len(streams)).NotTo(gomega.Equal(0))
})
@@ -68,7 +67,7 @@ func itShouldContainSpecificStream(id int, name string,
streams []iggcon.Stream)
}
}
- ginkgo.It("should fetch stream with id "+strconv.Itoa(id), func() {
+ ginkgo.It(fmt.Sprintf("should fetch stream with id %d", id), func() {
gomega.Expect(found).To(gomega.BeTrue(), "Stream with id %d and
name %s not found", id, name)
gomega.Expect(stream.Id).To(gomega.Equal(id))
})
@@ -79,8 +78,9 @@ func itShouldContainSpecificStream(id int, name string,
streams []iggcon.Stream)
})
}
-func itShouldSuccessfullyCreateStream(id int, expectedName string, client
iggycli.Client) {
- stream, err := client.GetStream(iggcon.NewIdentifier(id))
+func itShouldSuccessfullyCreateStream(id uint32, expectedName string, client
iggycli.Client) {
+ streamIdentifier, _ := iggcon.NewIdentifier(id)
+ stream, err := client.GetStream(streamIdentifier)
itShouldNotReturnError(err)
ginkgo.It("should create stream with id "+string(rune(id)), func() {
@@ -92,8 +92,9 @@ func itShouldSuccessfullyCreateStream(id int, expectedName
string, client iggycl
})
}
-func itShouldSuccessfullyUpdateStream(id int, expectedName string, client
iggycli.Client) {
- stream, err := client.GetStream(iggcon.NewIdentifier(id))
+func itShouldSuccessfullyUpdateStream(id uint32, expectedName string, client
iggycli.Client) {
+ streamIdentifier, _ := iggcon.NewIdentifier(id)
+ stream, err := client.GetStream(streamIdentifier)
itShouldNotReturnError(err)
ginkgo.It("should update stream with id "+string(rune(id)), func() {
@@ -105,8 +106,9 @@ func itShouldSuccessfullyUpdateStream(id int, expectedName
string, client iggycl
})
}
-func itShouldSuccessfullyDeleteStream(id int, client iggycli.Client) {
- stream, err := client.GetStream(iggcon.NewIdentifier(id))
+func itShouldSuccessfullyDeleteStream(id uint32, client iggycli.Client) {
+ streamIdentifier, _ := iggcon.NewIdentifier(id)
+ stream, err := client.GetStream(streamIdentifier)
itShouldReturnSpecificIggyError(err, ierror.StreamIdNotFound)
ginkgo.It("should not return stream", func() {
@@ -114,6 +116,7 @@ func itShouldSuccessfullyDeleteStream(id int, client
iggycli.Client) {
})
}
-func deleteStreamAfterTests(streamId int, client iggycli.Client) {
- _ = client.DeleteStream(iggcon.NewIdentifier(streamId))
+func deleteStreamAfterTests(streamId uint32, client iggycli.Client) {
+ streamIdentifier, _ := iggcon.NewIdentifier(streamId)
+ _ = client.DeleteStream(streamIdentifier)
}
diff --git a/bdd/go/tests/tcp_test/test_helpers.go
b/bdd/go/tests/tcp_test/test_helpers.go
index f30df5e6..075c8b42 100644
--- a/bdd/go/tests/tcp_test/test_helpers.go
+++ b/bdd/go/tests/tcp_test/test_helpers.go
@@ -18,6 +18,7 @@
package tcp_test
import (
+ iggcon "github.com/apache/iggy/foreign/go/contracts"
"math/rand"
"os"
"strings"
@@ -53,8 +54,17 @@ func createClient() iggycli.Client {
}
func createRandomUInt32() uint32 {
- rand.New(rand.NewSource(time.Now().UnixNano()))
- return rand.Uint32()
+ r := rand.New(rand.NewSource(time.Now().UnixNano()))
+ var v uint32
+ for v == 0 {
+ v = r.Uint32()
+ }
+ return v
+}
+
+func randomU32Identifier() iggcon.Identifier {
+ id, _ := iggcon.NewIdentifier(createRandomUInt32())
+ return id
}
func createRandomString(length int) string {
diff --git a/bdd/go/tests/tcp_test/topic_feature_create.go
b/bdd/go/tests/tcp_test/topic_feature_create.go
index eb133d2a..abf89dbd 100644
--- a/bdd/go/tests/tcp_test/topic_feature_create.go
+++ b/bdd/go/tests/tcp_test/topic_feature_create.go
@@ -30,12 +30,13 @@ var _ = ginkgo.Describe("CREATE TOPIC:", func() {
ginkgo.Context("and tries to create topic unique name and id",
func() {
client := createAuthorizedConnection()
streamId, _ := successfullyCreateStream(prefix, client)
- topicId := 1
+ topicId := uint32(1)
replicationFactor := uint8(1)
name := createRandomString(32)
defer deleteStreamAfterTests(streamId, client)
+ streamIdentifier, _ := iggcon.NewIdentifier(streamId)
_, err := client.CreateTopic(
- iggcon.NewIdentifier(streamId),
+ streamIdentifier,
name,
2,
1,
@@ -50,12 +51,13 @@ var _ = ginkgo.Describe("CREATE TOPIC:", func() {
ginkgo.Context("and tries to create topic for a non existing
stream", func() {
client := createAuthorizedConnection()
- streamId := int(createRandomUInt32())
- topicId := 1
+ streamId := createRandomUInt32()
+ topicId := uint32(1)
replicationFactor := uint8(1)
name := createRandomString(32)
+ streamIdentifier, _ := iggcon.NewIdentifier(streamId)
_, err := client.CreateTopic(
- iggcon.NewIdentifier(streamId),
+ streamIdentifier,
name,
2,
1,
@@ -74,9 +76,10 @@ var _ = ginkgo.Describe("CREATE TOPIC:", func() {
_, name := successfullyCreateTopic(streamId, client)
replicationFactor := uint8(1)
- topicId := int(createRandomUInt32())
+ streamIdentifier, _ := iggcon.NewIdentifier(streamId)
+ topicId := createRandomUInt32()
_, err := client.CreateTopic(
- iggcon.NewIdentifier(streamId),
+ streamIdentifier,
name,
2,
1,
@@ -92,10 +95,10 @@ var _ = ginkgo.Describe("CREATE TOPIC:", func() {
streamId, _ := successfullyCreateStream(prefix, client)
defer deleteStreamAfterTests(streamId, client)
topicId, _ := successfullyCreateTopic(streamId, client)
-
+ streamIdentifier, _ := iggcon.NewIdentifier(streamId)
replicationFactor := uint8(1)
_, err := client.CreateTopic(
- iggcon.NewIdentifier(streamId),
+ streamIdentifier,
createRandomString(32),
2,
1,
@@ -111,10 +114,11 @@ var _ = ginkgo.Describe("CREATE TOPIC:", func() {
streamId, _ := successfullyCreateStream(prefix, client)
defer deleteStreamAfterTests(streamId,
createAuthorizedConnection())
+ streamIdentifier, _ := iggcon.NewIdentifier(streamId)
replicationFactor := uint8(1)
- topicId := int(createRandomUInt32())
+ topicId := createRandomUInt32()
_, err := client.CreateTopic(
- iggcon.NewIdentifier(streamId),
+ streamIdentifier,
createRandomString(256),
2,
1,
@@ -131,9 +135,10 @@ var _ = ginkgo.Describe("CREATE TOPIC:", func() {
ginkgo.Context("and tries to create topic", func() {
client := createClient()
replicationFactor := uint8(1)
- topicId := 1
+ topicId := uint32(1)
+ streamIdentifier, _ := iggcon.NewIdentifier[uint32](10)
_, err := client.CreateTopic(
- iggcon.NewIdentifier(10),
+ streamIdentifier,
"name",
2,
1,
diff --git a/bdd/go/tests/tcp_test/topic_feature_delete.go
b/bdd/go/tests/tcp_test/topic_feature_delete.go
index c4079023..76b15070 100644
--- a/bdd/go/tests/tcp_test/topic_feature_delete.go
+++ b/bdd/go/tests/tcp_test/topic_feature_delete.go
@@ -31,7 +31,9 @@ var _ = ginkgo.Describe("DELETE TOPIC:", func() {
streamId, _ := successfullyCreateStream(prefix, client)
defer deleteStreamAfterTests(streamId, client)
topicId, _ := successfullyCreateTopic(streamId, client)
- err :=
client.DeleteTopic(iggcon.NewIdentifier(streamId),
iggcon.NewIdentifier(topicId))
+ streamIdentifier, _ := iggcon.NewIdentifier(streamId)
+ topicIdentifier, _ := iggcon.NewIdentifier(topicId)
+ err := client.DeleteTopic(streamIdentifier,
topicIdentifier)
itShouldNotReturnError(err)
itShouldSuccessfullyDeleteTopic(streamId, topicId,
client)
@@ -41,19 +43,16 @@ var _ = ginkgo.Describe("DELETE TOPIC:", func() {
client := createAuthorizedConnection()
streamId, _ := successfullyCreateStream(prefix, client)
defer deleteStreamAfterTests(streamId, client)
- topicId := int(createRandomUInt32())
-
- err :=
client.DeleteTopic(iggcon.NewIdentifier(streamId),
iggcon.NewIdentifier(topicId))
+ streamIdentifier, _ := iggcon.NewIdentifier(streamId)
+ err := client.DeleteTopic(streamIdentifier,
randomU32Identifier())
itShouldReturnSpecificIggyError(err,
ierror.TopicIdNotFound)
})
ginkgo.Context("and tries to delete non-existing topic and
stream", func() {
client := createAuthorizedConnection()
- streamId := int(createRandomUInt32())
- topicId := int(createRandomUInt32())
- err :=
client.DeleteTopic(iggcon.NewIdentifier(streamId),
iggcon.NewIdentifier(topicId))
+ err := client.DeleteTopic(randomU32Identifier(),
randomU32Identifier())
itShouldReturnSpecificIggyError(err,
ierror.StreamIdNotFound)
})
@@ -62,7 +61,7 @@ var _ = ginkgo.Describe("DELETE TOPIC:", func() {
ginkgo.When("User is not logged in", func() {
ginkgo.Context("and tries to delete topic", func() {
client := createClient()
- err :=
client.DeleteTopic(iggcon.NewIdentifier(int(createRandomUInt32())),
iggcon.NewIdentifier(int(createRandomUInt32())))
+ err := client.DeleteTopic(randomU32Identifier(),
randomU32Identifier())
itShouldReturnUnauthenticatedError(err)
})
diff --git a/bdd/go/tests/tcp_test/topic_feature_get_all.go
b/bdd/go/tests/tcp_test/topic_feature_get_all.go
index 7c048e6d..16ee403b 100644
--- a/bdd/go/tests/tcp_test/topic_feature_get_all.go
+++ b/bdd/go/tests/tcp_test/topic_feature_get_all.go
@@ -30,7 +30,8 @@ var _ = ginkgo.Describe("GET ALL TOPICS:", func() {
streamId, _ := successfullyCreateStream(prefix, client)
defer deleteStreamAfterTests(streamId, client)
topicId, name := successfullyCreateTopic(streamId,
client)
- topics, err :=
client.GetTopics(iggcon.NewIdentifier(streamId))
+ streamIdentifier, _ := iggcon.NewIdentifier(streamId)
+ topics, err := client.GetTopics(streamIdentifier)
itShouldNotReturnError(err)
itShouldContainSpecificTopic(topicId, name, topics)
@@ -40,7 +41,7 @@ var _ = ginkgo.Describe("GET ALL TOPICS:", func() {
ginkgo.When("User is not logged in", func() {
ginkgo.Context("and tries to get all topics", func() {
client := createClient()
- _, err :=
client.GetTopics(iggcon.NewIdentifier(int(createRandomUInt32())))
+ _, err := client.GetTopics(randomU32Identifier())
itShouldReturnUnauthenticatedError(err)
})
diff --git a/bdd/go/tests/tcp_test/topic_feature_get_by_id.go
b/bdd/go/tests/tcp_test/topic_feature_get_by_id.go
index 9b098891..cef09f50 100644
--- a/bdd/go/tests/tcp_test/topic_feature_get_by_id.go
+++ b/bdd/go/tests/tcp_test/topic_feature_get_by_id.go
@@ -31,7 +31,9 @@ var _ = ginkgo.Describe("GET TOPIC BY ID:", func() {
streamId, _ := successfullyCreateStream(prefix, client)
defer deleteStreamAfterTests(streamId, client)
topicId, name := successfullyCreateTopic(streamId,
client)
- topic, err :=
client.GetTopic(iggcon.NewIdentifier(streamId), iggcon.NewIdentifier(topicId))
+ streamIdentifier, _ := iggcon.NewIdentifier(streamId)
+ topicIdentifier, _ := iggcon.NewIdentifier(topicId)
+ topic, err := client.GetTopic(streamIdentifier,
topicIdentifier)
itShouldNotReturnError(err)
itShouldReturnSpecificTopic(topicId, name, *topic)
@@ -39,9 +41,8 @@ var _ = ginkgo.Describe("GET TOPIC BY ID:", func() {
ginkgo.Context("and tries to get topic from non-existing
stream", func() {
client := createAuthorizedConnection()
- streamId := int(createRandomUInt32())
- _, err :=
client.GetTopic(iggcon.NewIdentifier(streamId),
iggcon.NewIdentifier(int(createRandomUInt32())))
+ _, err := client.GetTopic(randomU32Identifier(),
randomU32Identifier())
itShouldReturnSpecificIggyError(err,
ierror.TopicIdNotFound)
})
@@ -50,8 +51,9 @@ var _ = ginkgo.Describe("GET TOPIC BY ID:", func() {
client := createAuthorizedConnection()
streamId, _ := successfullyCreateStream(prefix, client)
defer deleteStreamAfterTests(streamId, client)
+ streamIdentifier, _ := iggcon.NewIdentifier(streamId)
- _, err :=
client.GetTopic(iggcon.NewIdentifier(streamId),
iggcon.NewIdentifier(int(createRandomUInt32())))
+ _, err := client.GetTopic(streamIdentifier,
randomU32Identifier())
itShouldReturnSpecificIggyError(err,
ierror.TopicIdNotFound)
})
diff --git a/bdd/go/tests/tcp_test/topic_feature_update.go
b/bdd/go/tests/tcp_test/topic_feature_update.go
index 499d5d89..8c606404 100644
--- a/bdd/go/tests/tcp_test/topic_feature_update.go
+++ b/bdd/go/tests/tcp_test/topic_feature_update.go
@@ -34,8 +34,11 @@ var _ = ginkgo.Describe("UPDATE TOPIC:", func() {
topicId, _ := successfullyCreateTopic(streamId, client)
newName := createRandomString(128)
replicationFactor := uint8(1)
- err :=
client.UpdateTopic(iggcon.NewIdentifier(streamId),
- iggcon.NewIdentifier(topicId),
+ streamIdentifier, _ := iggcon.NewIdentifier(streamId)
+ topicIdentifier, _ := iggcon.NewIdentifier(topicId)
+ err := client.UpdateTopic(
+ streamIdentifier,
+ topicIdentifier,
newName,
1,
1,
@@ -52,8 +55,11 @@ var _ = ginkgo.Describe("UPDATE TOPIC:", func() {
_, topic1Name := successfullyCreateTopic(streamId,
client)
topic2Id, _ := successfullyCreateTopic(streamId, client)
replicationFactor := uint8(1)
- err :=
client.UpdateTopic(iggcon.NewIdentifier(streamId),
- iggcon.NewIdentifier(topic2Id),
+ streamIdentifier, _ := iggcon.NewIdentifier(streamId)
+ topic2Identifier, _ := iggcon.NewIdentifier(topic2Id)
+ err := client.UpdateTopic(
+ streamIdentifier,
+ topic2Identifier,
topic1Name,
1,
0,
@@ -65,12 +71,10 @@ var _ = ginkgo.Describe("UPDATE TOPIC:", func() {
ginkgo.Context("and tries to update non-existing topic", func()
{
client := createAuthorizedConnection()
- streamId := int(createRandomUInt32())
- topicId := int(createRandomUInt32())
replicationFactor := uint8(1)
err := client.UpdateTopic(
- iggcon.NewIdentifier(streamId),
- iggcon.NewIdentifier(topicId),
+ randomU32Identifier(),
+ randomU32Identifier(),
createRandomString(128),
1,
0,
@@ -84,11 +88,11 @@ var _ = ginkgo.Describe("UPDATE TOPIC:", func() {
client := createAuthorizedConnection()
streamId, _ := successfullyCreateStream(prefix, client)
defer deleteStreamAfterTests(streamId,
createAuthorizedConnection())
- topicId := int(createRandomUInt32())
replicationFactor := uint8(1)
+ streamIdentifier, _ := iggcon.NewIdentifier(streamId)
err := client.UpdateTopic(
- iggcon.NewIdentifier(streamId),
- iggcon.NewIdentifier(topicId),
+ streamIdentifier,
+ randomU32Identifier(),
createRandomString(128),
1,
0,
@@ -104,9 +108,11 @@ var _ = ginkgo.Describe("UPDATE TOPIC:", func() {
defer deleteStreamAfterTests(streamId,
createAuthorizedConnection())
topicId, _ := successfullyCreateTopic(streamId, client)
replicationFactor := uint8(1)
+ streamIdentifier, _ := iggcon.NewIdentifier(streamId)
+ topicIdentifier, _ := iggcon.NewIdentifier(topicId)
err := client.UpdateTopic(
- iggcon.NewIdentifier(streamId),
- iggcon.NewIdentifier(topicId),
+ streamIdentifier,
+ topicIdentifier,
createRandomString(256),
1,
0,
@@ -120,7 +126,7 @@ var _ = ginkgo.Describe("UPDATE TOPIC:", func() {
ginkgo.When("User is not logged in", func() {
ginkgo.Context("and tries to update stream", func() {
client := createClient()
- err :=
client.UpdateStream(iggcon.NewIdentifier(int(createRandomUInt32())),
createRandomString(128))
+ err := client.UpdateStream(randomU32Identifier(),
createRandomString(128))
itShouldReturnUnauthenticatedError(err)
})
diff --git a/bdd/go/tests/tcp_test/topic_steps.go
b/bdd/go/tests/tcp_test/topic_steps.go
index 85484ba6..149c53d9 100644
--- a/bdd/go/tests/tcp_test/topic_steps.go
+++ b/bdd/go/tests/tcp_test/topic_steps.go
@@ -18,24 +18,24 @@
package tcp_test
import (
- "math"
- "strconv"
-
+ "fmt"
iggcon "github.com/apache/iggy/foreign/go/contracts"
ierror "github.com/apache/iggy/foreign/go/errors"
"github.com/apache/iggy/foreign/go/iggycli"
"github.com/onsi/ginkgo/v2"
"github.com/onsi/gomega"
+ "math"
)
//operations
-func successfullyCreateTopic(streamId int, client iggycli.Client) (int,
string) {
- topicId := int(createRandomUInt32())
+func successfullyCreateTopic(streamId uint32, client iggycli.Client) (uint32,
string) {
+ topicId := createRandomUInt32()
replicationFactor := uint8(1)
name := createRandomString(128)
+ streamIdentifier, _ := iggcon.NewIdentifier(streamId)
_, err := client.CreateTopic(
- iggcon.NewIdentifier(streamId),
+ streamIdentifier,
name,
2,
1,
@@ -51,7 +51,7 @@ func successfullyCreateTopic(streamId int, client
iggycli.Client) (int, string)
//assertions
-func itShouldReturnSpecificTopic(id int, name string, topic
iggcon.TopicDetails) {
+func itShouldReturnSpecificTopic(id uint32, name string, topic
iggcon.TopicDetails) {
ginkgo.It("should fetch topic with id "+string(rune(id)), func() {
gomega.Expect(topic.Id).To(gomega.Equal(id))
})
@@ -61,7 +61,7 @@ func itShouldReturnSpecificTopic(id int, name string, topic
iggcon.TopicDetails)
})
}
-func itShouldContainSpecificTopic(id int, name string, topics []iggcon.Topic) {
+func itShouldContainSpecificTopic(id uint32, name string, topics
[]iggcon.Topic) {
ginkgo.It("should fetch at least one topic", func() {
gomega.Expect(len(topics)).NotTo(gomega.Equal(0))
})
@@ -77,7 +77,7 @@ func itShouldContainSpecificTopic(id int, name string, topics
[]iggcon.Topic) {
}
}
- ginkgo.It("should fetch topic with id "+strconv.Itoa(id), func() {
+ ginkgo.It(fmt.Sprintf("should fetch topic with id %d", id), func() {
gomega.Expect(found).To(gomega.BeTrue(), "Topic with id %d and
name %s not found", id, name)
gomega.Expect(topic.Id).To(gomega.Equal(id))
})
@@ -88,8 +88,10 @@ func itShouldContainSpecificTopic(id int, name string,
topics []iggcon.Topic) {
})
}
-func itShouldSuccessfullyCreateTopic(streamId int, topicId int, expectedName
string, client iggycli.Client) {
- topic, err := client.GetTopic(iggcon.NewIdentifier(streamId),
iggcon.NewIdentifier(topicId))
+func itShouldSuccessfullyCreateTopic(streamId uint32, topicId uint32,
expectedName string, client iggycli.Client) {
+ streamIdentifier, _ := iggcon.NewIdentifier(streamId)
+ topicIdentifier, _ := iggcon.NewIdentifier(topicId)
+ topic, err := client.GetTopic(streamIdentifier, topicIdentifier)
ginkgo.It("should create topic with id "+string(rune(topicId)), func() {
gomega.Expect(topic).NotTo(gomega.BeNil())
gomega.Expect(topic.Id).To(gomega.Equal(topicId))
@@ -102,8 +104,10 @@ func itShouldSuccessfullyCreateTopic(streamId int, topicId
int, expectedName str
itShouldNotReturnError(err)
}
-func itShouldSuccessfullyUpdateTopic(streamId int, topicId int, expectedName
string, client iggycli.Client) {
- topic, err := client.GetTopic(iggcon.NewIdentifier(streamId),
iggcon.NewIdentifier(topicId))
+func itShouldSuccessfullyUpdateTopic(streamId uint32, topicId uint32,
expectedName string, client iggycli.Client) {
+ streamIdentifier, _ := iggcon.NewIdentifier(streamId)
+ topicIdentifier, _ := iggcon.NewIdentifier(topicId)
+ topic, err := client.GetTopic(streamIdentifier, topicIdentifier)
ginkgo.It("should update topic with id "+string(rune(topicId)), func() {
gomega.Expect(topic).NotTo(gomega.BeNil())
@@ -117,8 +121,10 @@ func itShouldSuccessfullyUpdateTopic(streamId int, topicId
int, expectedName str
itShouldNotReturnError(err)
}
-func itShouldSuccessfullyDeleteTopic(streamId int, topicId int, client
iggycli.Client) {
- topic, err := client.GetTopic(iggcon.NewIdentifier(streamId),
iggcon.NewIdentifier(topicId))
+func itShouldSuccessfullyDeleteTopic(streamId uint32, topicId uint32, client
iggycli.Client) {
+ streamIdentifier, _ := iggcon.NewIdentifier(streamId)
+ topicIdentifier, _ := iggcon.NewIdentifier(topicId)
+ topic, err := client.GetTopic(streamIdentifier, topicIdentifier)
itShouldReturnSpecificIggyError(err, ierror.TopicIdNotFound)
ginkgo.It("should not return topic", func() {
diff --git a/bdd/go/tests/tcp_test/users_feature_create.go
b/bdd/go/tests/tcp_test/users_feature_create.go
index 2b3c2e3a..11d40e02 100644
--- a/bdd/go/tests/tcp_test/users_feature_create.go
+++ b/bdd/go/tests/tcp_test/users_feature_create.go
@@ -46,7 +46,8 @@ var _ = ginkgo.Describe("CREATE USER:", func() {
SendMessages: true,
},
})
- defer deleteUserAfterTests(username, client)
+ identifier, _ := iggcon.NewIdentifier(username)
+ defer deleteUserAfterTests(identifier, client)
itShouldNotReturnError(err)
itShouldSuccessfullyCreateUser(username, client)
@@ -100,7 +101,8 @@ var _ = ginkgo.Describe("CREATE USER:", func() {
},
Streams: userStreamPermissions,
})
- defer deleteUserAfterTests(username, client)
+ identifier, _ := iggcon.NewIdentifier(username)
+ defer deleteUserAfterTests(identifier, client)
defer deleteStreamAfterTests(streamId, client)
itShouldNotReturnError(err)
diff --git a/bdd/go/tests/tcp_test/users_feature_delete.go
b/bdd/go/tests/tcp_test/users_feature_delete.go
index 5697e6b3..ce3de199 100644
--- a/bdd/go/tests/tcp_test/users_feature_delete.go
+++ b/bdd/go/tests/tcp_test/users_feature_delete.go
@@ -27,18 +27,18 @@ var _ = ginkgo.Describe("DELETE USER:", func() {
ginkgo.Context("tries to delete user with correct data", func()
{
client := createAuthorizedConnection()
userId :=
successfullyCreateUser(createRandomString(16), client)
-
- err :=
client.DeleteUser(iggcon.NewIdentifier(int(userId)))
+ userIdentifier, _ := iggcon.NewIdentifier(userId)
+ err := client.DeleteUser(userIdentifier)
itShouldNotReturnError(err)
- itShouldSuccessfullyDeleteUser(int(userId), client)
+ itShouldSuccessfullyDeleteUser(userId, client)
})
})
ginkgo.When("User is not logged in", func() {
ginkgo.Context("and tries to delete user", func() {
client := createClient()
- err :=
client.DeleteUser(iggcon.NewIdentifier(int(createRandomUInt32())))
+ err := client.DeleteUser(randomU32Identifier())
itShouldReturnUnauthenticatedError(err)
})
})
diff --git a/bdd/go/tests/tcp_test/users_feature_get_all.go
b/bdd/go/tests/tcp_test/users_feature_get_all.go
index 6e4976a1..60c508f5 100644
--- a/bdd/go/tests/tcp_test/users_feature_get_all.go
+++ b/bdd/go/tests/tcp_test/users_feature_get_all.go
@@ -18,6 +18,7 @@
package tcp_test
import (
+ iggcon "github.com/apache/iggy/foreign/go/contracts"
"github.com/onsi/ginkgo/v2"
)
@@ -27,7 +28,8 @@ var _ = ginkgo.Describe("GET USER:", func() {
client := createAuthorizedConnection()
name := createRandomString(16)
userId := successfullyCreateUser(name, client)
- defer deleteUserAfterTests(int(userId), client)
+ userIdentifier, _ := iggcon.NewIdentifier(userId)
+ defer deleteUserAfterTests(userIdentifier, client)
users, err := client.GetUsers()
diff --git a/bdd/go/tests/tcp_test/users_feature_get_by_id.go
b/bdd/go/tests/tcp_test/users_feature_get_by_id.go
index 9915f021..18d2c905 100644
--- a/bdd/go/tests/tcp_test/users_feature_get_by_id.go
+++ b/bdd/go/tests/tcp_test/users_feature_get_by_id.go
@@ -28,9 +28,10 @@ var _ = ginkgo.Describe("GET USER:", func() {
client := createAuthorizedConnection()
name := createRandomString(16)
userId := successfullyCreateUser(name, client)
- defer deleteUserAfterTests(int(userId), client)
+ userIdentifier, _ := iggcon.NewIdentifier(userId)
+ defer deleteUserAfterTests(userIdentifier, client)
- user, err :=
client.GetUser(iggcon.NewIdentifier(int(userId)))
+ user, err := client.GetUser(userIdentifier)
itShouldNotReturnError(err)
itShouldReturnSpecificUser(name, user.UserInfo)
diff --git a/bdd/go/tests/tcp_test/users_feature_password.go
b/bdd/go/tests/tcp_test/users_feature_password.go
index 3aac2225..4d6013d8 100644
--- a/bdd/go/tests/tcp_test/users_feature_password.go
+++ b/bdd/go/tests/tcp_test/users_feature_password.go
@@ -48,9 +48,10 @@ var _ = ginkgo.Describe("CHANGE PASSWORD:", func() {
},
})
itShouldNotReturnError(err)
- defer deleteUserAfterTests(username, client)
+ identifier, _ := iggcon.NewIdentifier(username)
+ defer deleteUserAfterTests(identifier, client)
- err =
client.ChangePassword(iggcon.NewIdentifier(username), password, "newPassword")
+ err = client.ChangePassword(identifier, password,
"newPassword")
itShouldNotReturnError(err)
//itShouldBePossibleToLogInWithCredentials(createRequest.Username,
request.NewPassword)
@@ -62,7 +63,7 @@ var _ = ginkgo.Describe("CHANGE PASSWORD:", func() {
client := createClient()
err := client.UpdatePermissions(
- iggcon.NewIdentifier(int(createRandomUInt32())),
+ randomU32Identifier(),
&iggcon.Permissions{
Global: iggcon.GlobalPermissions{
ManageServers: false,
diff --git a/bdd/go/tests/tcp_test/users_feature_permissions.go
b/bdd/go/tests/tcp_test/users_feature_permissions.go
index 283cd130..972b0d93 100644
--- a/bdd/go/tests/tcp_test/users_feature_permissions.go
+++ b/bdd/go/tests/tcp_test/users_feature_permissions.go
@@ -27,10 +27,11 @@ var _ = ginkgo.Describe("UPDATE USER PERMISSIONS:", func() {
ginkgo.Context("tries to update permissions of existing user",
func() {
client := createAuthorizedConnection()
userId :=
successfullyCreateUser(createRandomString(16), client)
- defer deleteUserAfterTests(userId, client)
+ identifier, _ := iggcon.NewIdentifier(userId)
+ defer deleteUserAfterTests(identifier, client)
err := client.UpdatePermissions(
- iggcon.NewIdentifier(int(userId)),
+ identifier,
&iggcon.Permissions{
Global: iggcon.GlobalPermissions{
ManageServers: false,
@@ -56,7 +57,7 @@ var _ = ginkgo.Describe("UPDATE USER PERMISSIONS:", func() {
client := createClient()
username := createRandomString(16)
err := client.UpdateUser(
- iggcon.NewIdentifier(int(createRandomUInt32())),
+ randomU32Identifier(),
&username,
nil,
)
diff --git a/bdd/go/tests/tcp_test/users_feature_update.go
b/bdd/go/tests/tcp_test/users_feature_update.go
index da54ab40..94efbbfb 100644
--- a/bdd/go/tests/tcp_test/users_feature_update.go
+++ b/bdd/go/tests/tcp_test/users_feature_update.go
@@ -27,11 +27,12 @@ var _ = ginkgo.Describe("UPDATE USER:", func() {
ginkgo.Context("tries to update user existing user", func() {
client := createAuthorizedConnection()
userId :=
successfullyCreateUser(createRandomString(16), client)
- defer deleteUserAfterTests(userId, client)
+ identifier, _ := iggcon.NewIdentifier(userId)
+ defer deleteUserAfterTests(identifier, client)
username := createRandomString(16)
err := client.UpdateUser(
- iggcon.NewIdentifier(int(userId)),
+ identifier,
&username,
nil,
)
@@ -47,7 +48,7 @@ var _ = ginkgo.Describe("UPDATE USER:", func() {
username := createRandomString(16)
err := client.UpdateUser(
- iggcon.NewIdentifier(int(createRandomUInt32())),
+ randomU32Identifier(),
&username,
nil,
)
diff --git a/bdd/go/tests/tcp_test/users_steps.go
b/bdd/go/tests/tcp_test/users_steps.go
index f6b01b66..451788af 100644
--- a/bdd/go/tests/tcp_test/users_steps.go
+++ b/bdd/go/tests/tcp_test/users_steps.go
@@ -46,7 +46,8 @@ func successfullyCreateUser(name string, client
iggycli.Client) uint32 {
},
})
itShouldNotReturnError(err)
- user, err := client.GetUser(iggcon.NewIdentifier(name))
+ nameIdentifier, _ := iggcon.NewIdentifier(name)
+ user, err := client.GetUser(nameIdentifier)
itShouldNotReturnError(err)
return user.Id
@@ -55,7 +56,8 @@ func successfullyCreateUser(name string, client
iggycli.Client) uint32 {
// ASSERTIONS
func itShouldSuccessfullyCreateUser(name string, client iggycli.Client) {
- user, err := client.GetUser(iggcon.NewIdentifier(name))
+ nameIdentifier, _ := iggcon.NewIdentifier(name)
+ user, err := client.GetUser(nameIdentifier)
itShouldNotReturnError(err)
@@ -65,7 +67,8 @@ func itShouldSuccessfullyCreateUser(name string, client
iggycli.Client) {
}
func itShouldSuccessfullyCreateUserWithPermissions(name string, client
iggycli.Client, permissions map[int]*iggcon.StreamPermissions) {
- user, err := client.GetUser(iggcon.NewIdentifier(name))
+ nameIdentifier, _ := iggcon.NewIdentifier(name)
+ user, err := client.GetUser(nameIdentifier)
itShouldNotReturnError(err)
@@ -95,7 +98,8 @@ func itShouldSuccessfullyCreateUserWithPermissions(name
string, client iggycli.C
}
func itShouldSuccessfullyUpdateUser(id uint32, name string, client
iggycli.Client) {
- user, err := client.GetUser(iggcon.NewIdentifier(name))
+ nameIdentifier, _ := iggcon.NewIdentifier(name)
+ user, err := client.GetUser(nameIdentifier)
itShouldNotReturnError(err)
@@ -108,8 +112,9 @@ func itShouldSuccessfullyUpdateUser(id uint32, name string,
client iggycli.Clien
})
}
-func itShouldSuccessfullyDeleteUser(userId int, client iggycli.Client) {
- user, err := client.GetUser(iggcon.NewIdentifier(userId))
+func itShouldSuccessfullyDeleteUser(userId uint32, client iggycli.Client) {
+ identifier, _ := iggcon.NewIdentifier(userId)
+ user, err := client.GetUser(identifier)
itShouldReturnSpecificError(err, "resource_not_found")
ginkgo.It("should not return user", func() {
@@ -118,7 +123,8 @@ func itShouldSuccessfullyDeleteUser(userId int, client
iggycli.Client) {
}
func itShouldSuccessfullyUpdateUserPermissions(userId uint32, client
iggycli.Client) {
- user, err := client.GetUser(iggcon.NewIdentifier(int(userId)))
+ identifier, _ := iggcon.NewIdentifier(userId)
+ user, err := client.GetUser(identifier)
itShouldNotReturnError(err)
@@ -166,6 +172,6 @@ func itShouldContainSpecificUser(name string, users
[]iggcon.UserInfo) {
//CLEANUP
-func deleteUserAfterTests(name any, client iggycli.Client) {
- _ = client.DeleteUser(iggcon.NewIdentifier(name))
+func deleteUserAfterTests(identifier iggcon.Identifier, client iggycli.Client)
{
+ _ = client.DeleteUser(identifier)
}
diff --git a/foreign/go/benchmarks/send_messages_benchmark_test.go
b/foreign/go/benchmarks/send_messages_benchmark_test.go
index 78978526..301f5ff8 100644
--- a/foreign/go/benchmarks/send_messages_benchmark_test.go
+++ b/foreign/go/benchmarks/send_messages_benchmark_test.go
@@ -60,7 +60,7 @@ func BenchmarkSendMessage(b *testing.B) {
}
for index, value := range clients {
- err := ensureInfrastructureIsInitialized(value,
startingStreamId+index)
+ err := ensureInfrastructureIsInitialized(value,
uint32(startingStreamId+index))
if err != nil {
panic("COULD NOT INITIALIZE INFRASTRUCTURE")
}
@@ -96,24 +96,26 @@ func BenchmarkSendMessage(b *testing.B) {
fmt.Printf("Summarized Average Throughput: %.2f MB/s\n",
aggregateThroughput)
for index, value := range clients {
- err := cleanupInfrastructure(value, startingStreamId+index)
+ err := cleanupInfrastructure(value,
uint32(startingStreamId+index))
if err != nil {
panic("COULD NOT CLEANUP INFRASTRUCTURE")
}
}
}
-func ensureInfrastructureIsInitialized(cli iggycli.Client, streamId int) error
{
- if _, streamErr := cli.GetStream(iggcon.NewIdentifier(streamId));
streamErr != nil {
- uint32StreamId := uint32(streamId)
- _, streamErr =
cli.CreateStream("benchmark"+fmt.Sprint(streamId), &uint32StreamId)
+func ensureInfrastructureIsInitialized(cli iggycli.Client, streamId uint32)
error {
+ streamIdentifier, _ := iggcon.NewIdentifier(streamId)
+ if _, streamErr := cli.GetStream(streamIdentifier); streamErr != nil {
+ _, streamErr =
cli.CreateStream("benchmark"+fmt.Sprint(streamId), &streamId)
if streamErr != nil {
panic(streamErr)
}
}
- if _, topicErr := cli.GetTopic(iggcon.NewIdentifier(streamId),
iggcon.NewIdentifier(1)); topicErr != nil {
+
+ topicIdentifier, _ := iggcon.NewIdentifier(uint32(1))
+ if _, topicErr := cli.GetTopic(streamIdentifier, topicIdentifier);
topicErr != nil {
_, topicErr = cli.CreateTopic(
- iggcon.NewIdentifier(streamId),
+ streamIdentifier,
"benchmark",
1,
1,
@@ -130,8 +132,9 @@ func ensureInfrastructureIsInitialized(cli iggycli.Client,
streamId int) error {
return nil
}
-func cleanupInfrastructure(cli iggycli.Client, streamId int) error {
- return cli.DeleteStream(iggcon.NewIdentifier(streamId))
+func cleanupInfrastructure(cli iggycli.Client, streamId uint32) error {
+ streamIdent, _ := iggcon.NewIdentifier(streamId)
+ return cli.DeleteStream(streamIdent)
}
// CreateMessages creates messages with random payloads.
@@ -159,15 +162,16 @@ func SendMessage(cli iggycli.Client, producerNumber,
messagesCount, messagesBatc
totalMessagesBytes := int64(totalMessages * messageSize)
fmt.Printf("Executing Send Messages command for producer %d, messages
count %d, with size %d bytes\n", producerNumber, totalMessages,
totalMessagesBytes)
- streamId := iggcon.NewIdentifier(startingStreamId + producerNumber)
+ streamId, _ := iggcon.NewIdentifier(uint32(startingStreamId +
producerNumber))
messages := CreateMessages(messagesCount, messageSize)
latencies := make([]time.Duration, 0)
for i := 0; i < messagesBatch; i++ {
startTime := time.Now()
+ topicIdentifier, _ := iggcon.NewIdentifier(uint32(topicId))
_ = cli.SendMessages(
streamId,
- iggcon.NewIdentifier(topicId),
+ topicIdentifier,
iggcon.PartitionId(1),
messages,
)
diff --git a/foreign/go/binary_serialization/binary_request_serializer.go
b/foreign/go/binary_serialization/binary_request_serializer.go
index 3f2ce3d3..d69e3b13 100644
--- a/foreign/go/binary_serialization/binary_request_serializer.go
+++ b/foreign/go/binary_serialization/binary_request_serializer.go
@@ -319,9 +319,9 @@ func SerializeUpdateUserPermissionsRequest(request
iggcon.UpdatePermissionsReque
return bytes
}
-func SerializeInt(value int) []byte {
+func SerializeUint32(value uint32) []byte {
bytes := make([]byte, 4)
- binary.LittleEndian.PutUint32(bytes, uint32(value))
+ binary.LittleEndian.PutUint32(bytes, value)
return bytes
}
diff --git a/foreign/go/binary_serialization/binary_response_deserializer.go
b/foreign/go/binary_serialization/binary_response_deserializer.go
index 9b29511c..e2b746af 100644
--- a/foreign/go/binary_serialization/binary_response_deserializer.go
+++ b/foreign/go/binary_serialization/binary_response_deserializer.go
@@ -36,7 +36,7 @@ func DeserializeLogInResponse(payload []byte)
*iggcon.IdentityInfo {
}
func DeserializeOffset(payload []byte) *iggcon.ConsumerOffsetInfo {
- partitionId := int(binary.LittleEndian.Uint32(payload[0:4]))
+ partitionId := binary.LittleEndian.Uint32(payload[0:4])
currentOffset := binary.LittleEndian.Uint64(payload[4:12])
storedOffset := binary.LittleEndian.Uint64(payload[12:20])
@@ -72,9 +72,9 @@ func DeserializeStreams(payload []byte) []iggcon.Stream {
}
func DeserializeToStream(payload []byte, position int) (iggcon.Stream, int) {
- id := int(binary.LittleEndian.Uint32(payload[position : position+4]))
+ id := binary.LittleEndian.Uint32(payload[position : position+4])
createdAt := binary.LittleEndian.Uint64(payload[position+4 :
position+12])
- topicsCount := int(binary.LittleEndian.Uint32(payload[position+12 :
position+16]))
+ topicsCount := binary.LittleEndian.Uint32(payload[position+12 :
position+16])
sizeBytes := binary.LittleEndian.Uint64(payload[position+16 :
position+24])
messagesCount := binary.LittleEndian.Uint64(payload[position+24 :
position+32])
nameLength := int(payload[position+32])
@@ -198,9 +198,9 @@ func DeserializeTopic(payload []byte)
(*iggcon.TopicDetails, error) {
func DeserializeToTopic(payload []byte, position int) (iggcon.Topic, int,
error) {
topic := iggcon.Topic{}
- topic.Id = int(binary.LittleEndian.Uint32(payload[position :
position+4]))
- topic.CreatedAt = int(binary.LittleEndian.Uint64(payload[position+4 :
position+12]))
- topic.PartitionsCount =
int(binary.LittleEndian.Uint32(payload[position+12 : position+16]))
+ topic.Id = binary.LittleEndian.Uint32(payload[position : position+4])
+ topic.CreatedAt = binary.LittleEndian.Uint64(payload[position+4 :
position+12])
+ topic.PartitionsCount = binary.LittleEndian.Uint32(payload[position+12
: position+16])
topic.MessageExpiry = time.Microsecond *
time.Duration(int(binary.LittleEndian.Uint64(payload[position+16:position+24])))
topic.CompressionAlgorithm = payload[position+24]
topic.MaxTopicSize = binary.LittleEndian.Uint64(payload[position+25 :
position+33])
@@ -216,9 +216,9 @@ func DeserializeToTopic(payload []byte, position int)
(iggcon.Topic, int, error)
}
func DeserializePartition(payload []byte, position int)
(iggcon.PartitionContract, int) {
- id := int(binary.LittleEndian.Uint32(payload[position : position+4]))
+ id := binary.LittleEndian.Uint32(payload[position : position+4])
createdAt := binary.LittleEndian.Uint64(payload[position+4 :
position+12])
- segmentsCount := int(binary.LittleEndian.Uint32(payload[position+12 :
position+16]))
+ segmentsCount := binary.LittleEndian.Uint32(payload[position+12 :
position+16])
currentOffset := binary.LittleEndian.Uint64(payload[position+16 :
position+24])
sizeBytes := binary.LittleEndian.Uint64(payload[position+24 :
position+32])
messagesCount := binary.LittleEndian.Uint64(payload[position+32 :
position+40])
@@ -252,9 +252,9 @@ func DeserializeConsumerGroups(payload []byte)
[]iggcon.ConsumerGroup {
}
func DeserializeToConsumerGroup(payload []byte, position int)
(*iggcon.ConsumerGroup, int) {
- id := int(binary.LittleEndian.Uint32(payload[position : position+4]))
- partitionsCount := int(binary.LittleEndian.Uint32(payload[position+4 :
position+8]))
- membersCount := int(binary.LittleEndian.Uint32(payload[position+8 :
position+12]))
+ id := binary.LittleEndian.Uint32(payload[position : position+4])
+ partitionsCount := binary.LittleEndian.Uint32(payload[position+4 :
position+8])
+ membersCount := binary.LittleEndian.Uint32(payload[position+8 :
position+12])
nameLength := int(payload[position+12])
name := string(payload[position+13 : position+13+nameLength])
@@ -501,14 +501,14 @@ func DeserializeClient(payload []byte)
*iggcon.ClientInfoDetails {
for position < length {
for i := uint32(0); i < clientInfo.ConsumerGroupsCount; i++ {
- streamId :=
int32(binary.LittleEndian.Uint32(payload[position : position+4]))
- topicId :=
int32(binary.LittleEndian.Uint32(payload[position+4 : position+8]))
- consumerGroupId :=
int32(binary.LittleEndian.Uint32(payload[position+8 : position+12]))
+ streamId := binary.LittleEndian.Uint32(payload[position
: position+4])
+ topicId :=
binary.LittleEndian.Uint32(payload[position+4 : position+8])
+ consumerGroupId :=
binary.LittleEndian.Uint32(payload[position+8 : position+12])
consumerGroup := iggcon.ConsumerGroupInfo{
- StreamId: int(streamId),
- TopicId: int(topicId),
- ConsumerGroupId: int(consumerGroupId),
+ StreamId: streamId,
+ TopicId: topicId,
+ ConsumerGroupId: consumerGroupId,
}
consumerGroups = append(consumerGroups, consumerGroup)
position += 12
diff --git a/foreign/go/binary_serialization/create_topic_serializer.go
b/foreign/go/binary_serialization/create_topic_serializer.go
index c3d21a66..5fc2ed5b 100644
--- a/foreign/go/binary_serialization/create_topic_serializer.go
+++ b/foreign/go/binary_serialization/create_topic_serializer.go
@@ -32,12 +32,12 @@ type TcpCreateTopicRequest struct {
MaxTopicSize uint64 `json:"maxTopicSize"`
Name string `json:"name"`
ReplicationFactor *uint8 `json:"replicationFactor"`
- TopicId *int `json:"topicId"`
+ TopicId *uint32 `json:"topicId"`
}
func (request *TcpCreateTopicRequest) Serialize() []byte {
if request.TopicId == nil {
- request.TopicId = new(int)
+ request.TopicId = new(uint32)
}
if request.ReplicationFactor == nil {
request.ReplicationFactor = new(uint8)
diff --git
a/foreign/go/binary_serialization/fetch_messages_request_serializer_test.go
b/foreign/go/binary_serialization/fetch_messages_request_serializer_test.go
index bf511c78..4b3f721b 100644
--- a/foreign/go/binary_serialization/fetch_messages_request_serializer_test.go
+++ b/foreign/go/binary_serialization/fetch_messages_request_serializer_test.go
@@ -25,14 +25,17 @@ import (
func TestSerialize_TcpFetchMessagesRequest(t *testing.T) {
partitionId := uint32(123)
+ consumerId, _ := iggcon.NewIdentifier(uint32(42))
+ streamId, _ := iggcon.NewIdentifier("test_stream_id")
+ topicId, _ := iggcon.NewIdentifier("test_topic_id")
// Create a sample TcpFetchMessagesRequest
request := TcpFetchMessagesRequest{
Consumer: iggcon.Consumer{
Kind: iggcon.ConsumerKindSingle,
- Id: iggcon.NewIdentifier(42),
+ Id: consumerId,
},
- StreamId: iggcon.NewIdentifier("test_stream_id"),
- TopicId: iggcon.NewIdentifier("test_topic_id"),
+ StreamId: streamId,
+ TopicId: topicId,
PartitionId: &partitionId,
Strategy: iggcon.FirstPollingStrategy(),
Count: 100,
diff --git a/foreign/go/binary_serialization/identifier_serializer.go
b/foreign/go/binary_serialization/identifier_serializer.go
index 79098f25..fb823a73 100644
--- a/foreign/go/binary_serialization/identifier_serializer.go
+++ b/foreign/go/binary_serialization/identifier_serializer.go
@@ -18,31 +18,14 @@
package binaryserialization
import (
- "encoding/binary"
-
iggcon "github.com/apache/iggy/foreign/go/contracts"
)
-const (
- idKindOffset = 0
- idLengthOffset = 1
- stringIdLength = 2
- numericIdLength = 4
-)
-
func SerializeIdentifier(identifier iggcon.Identifier) []byte {
- bytes := make([]byte, int(identifier.Length)+2)
- bytes[idKindOffset] = byte(identifier.Kind)
- bytes[idLengthOffset] = byte(identifier.Length)
-
- switch identifier.Kind {
- case iggcon.StringId:
- valAsString := identifier.Value.(string)
- copy(bytes[stringIdLength:], []byte(valAsString))
- case iggcon.NumericId:
- valAsInt := identifier.Value.(int)
-
binary.LittleEndian.PutUint32(bytes[stringIdLength:stringIdLength+numericIdLength],
uint32(valAsInt))
- }
+ bytes := make([]byte, identifier.Length+2)
+ bytes[0] = byte(identifier.Kind)
+ bytes[1] = byte(identifier.Length)
+ copy(bytes[2:], identifier.Value)
return bytes
}
diff --git a/foreign/go/binary_serialization/identifier_serializer_test.go
b/foreign/go/binary_serialization/identifier_serializer_test.go
index dc207223..3c96d7d9 100644
--- a/foreign/go/binary_serialization/identifier_serializer_test.go
+++ b/foreign/go/binary_serialization/identifier_serializer_test.go
@@ -18,6 +18,8 @@
package binaryserialization
import (
+ "errors"
+ ierror "github.com/apache/iggy/foreign/go/errors"
"testing"
iggcon "github.com/apache/iggy/foreign/go/contracts"
@@ -25,7 +27,7 @@ import (
func TestSerializeIdentifier_StringId(t *testing.T) {
// Test case for StringId
- identifier := iggcon.NewIdentifier("Hello")
+ identifier, _ := iggcon.NewIdentifier("Hello")
// Serialize the identifier
serialized := SerializeIdentifier(identifier)
@@ -45,7 +47,7 @@ func TestSerializeIdentifier_StringId(t *testing.T) {
func TestSerializeIdentifier_NumericId(t *testing.T) {
// Test case for NumericId
- identifier := iggcon.NewIdentifier(123)
+ identifier, _ := iggcon.NewIdentifier(uint32(123))
// Serialize the identifier
serialized := SerializeIdentifier(identifier)
@@ -65,19 +67,10 @@ func TestSerializeIdentifier_NumericId(t *testing.T) {
func TestSerializeIdentifier_EmptyStringId(t *testing.T) {
// Test case for an empty StringId
- identifier := iggcon.NewIdentifier("")
-
- // Serialize the identifier
- serialized := SerializeIdentifier(identifier)
-
- // Expected serialized bytes for an empty StringId
- expected := []byte{
- 0x02, // Kind (StringId)
- 0x00, // Length (0)
- }
+ _, err := iggcon.NewIdentifier("")
// Check if the serialized bytes match the expected bytes
- if !areBytesEqual(serialized, expected) {
- t.Errorf("Serialized bytes are incorrect for an empty StringId.
\nExpected:\t%v\nGot:\t\t%v", expected, serialized)
+ if !errors.Is(err, ierror.InvalidIdentifier) {
+ t.Errorf("Expected error: %v, got: %v",
ierror.InvalidIdentifier, err)
}
}
diff --git
a/foreign/go/binary_serialization/send_messages_request_serializer_test.go
b/foreign/go/binary_serialization/send_messages_request_serializer_test.go
index 4bf905f7..29807c34 100644
--- a/foreign/go/binary_serialization/send_messages_request_serializer_test.go
+++ b/foreign/go/binary_serialization/send_messages_request_serializer_test.go
@@ -26,9 +26,11 @@ import (
func TestSerialize_SendMessagesRequest(t *testing.T) {
message1 := generateTestMessage("data1")
+ streamId, _ := iggcon.NewIdentifier("test_stream_id")
+ topicId, _ := iggcon.NewIdentifier("test_topic_id")
request := TcpSendMessagesRequest{
- StreamId: iggcon.NewIdentifier("test_stream_id"),
- TopicId: iggcon.NewIdentifier("test_topic_id"),
+ StreamId: streamId,
+ TopicId: topicId,
Partitioning: iggcon.PartitionId(1),
Messages: []iggcon.IggyMessage{
message1,
diff --git a/foreign/go/binary_serialization/stats_serializer.go
b/foreign/go/binary_serialization/stats_serializer.go
index 640b5e13..101cc073 100644
--- a/foreign/go/binary_serialization/stats_serializer.go
+++ b/foreign/go/binary_serialization/stats_serializer.go
@@ -51,7 +51,7 @@ const (
)
func (stats *TcpStats) Deserialize(payload []byte) error {
- stats.ProcessId = int(binary.LittleEndian.Uint32(payload[processIDPos :
processIDPos+4]))
+ stats.ProcessId = binary.LittleEndian.Uint32(payload[processIDPos :
processIDPos+4])
stats.CpuUsage =
math.Float32frombits(binary.LittleEndian.Uint32(payload[cpuUsagePos :
cpuUsagePos+4]))
stats.TotalCpuUsage =
math.Float32frombits(binary.LittleEndian.Uint32(payload[totalCpuUsagePos :
totalCpuUsagePos+4]))
stats.MemoryUsage = binary.LittleEndian.Uint64(payload[memoryUsagePos :
memoryUsagePos+8])
@@ -62,13 +62,13 @@ func (stats *TcpStats) Deserialize(payload []byte) error {
stats.ReadBytes = binary.LittleEndian.Uint64(payload[readBytesPos :
readBytesPos+8])
stats.WrittenBytes = binary.LittleEndian.Uint64(payload[writtenBytesPos
: writtenBytesPos+8])
stats.MessagesSizeBytes =
binary.LittleEndian.Uint64(payload[messagesSizeBytesPos :
messagesSizeBytesPos+8])
- stats.StreamsCount =
int(binary.LittleEndian.Uint32(payload[streamsCountPos : streamsCountPos+4]))
- stats.TopicsCount =
int(binary.LittleEndian.Uint32(payload[topicsCountPos : topicsCountPos+4]))
- stats.PartitionsCount =
int(binary.LittleEndian.Uint32(payload[partitionsCountPos :
partitionsCountPos+4]))
- stats.SegmentsCount =
int(binary.LittleEndian.Uint32(payload[segmentsCountPos : segmentsCountPos+4]))
+ stats.StreamsCount = binary.LittleEndian.Uint32(payload[streamsCountPos
: streamsCountPos+4])
+ stats.TopicsCount = binary.LittleEndian.Uint32(payload[topicsCountPos :
topicsCountPos+4])
+ stats.PartitionsCount =
binary.LittleEndian.Uint32(payload[partitionsCountPos : partitionsCountPos+4])
+ stats.SegmentsCount =
binary.LittleEndian.Uint32(payload[segmentsCountPos : segmentsCountPos+4])
stats.MessagesCount =
binary.LittleEndian.Uint64(payload[messagesCountPos : messagesCountPos+8])
- stats.ClientsCount =
int(binary.LittleEndian.Uint32(payload[clientsCountPos : clientsCountPos+4]))
- stats.ConsumerGroupsCount =
int(binary.LittleEndian.Uint32(payload[consumerGroupsCountPos :
consumerGroupsCountPos+4]))
+ stats.ClientsCount = binary.LittleEndian.Uint32(payload[clientsCountPos
: clientsCountPos+4])
+ stats.ConsumerGroupsCount =
binary.LittleEndian.Uint32(payload[consumerGroupsCountPos :
consumerGroupsCountPos+4])
position := consumerGroupsCountPos + 4
hostnameLength := int(binary.LittleEndian.Uint32(payload[position :
position+4]))
diff --git a/foreign/go/binary_serialization/update_stream_serializer_test.go
b/foreign/go/binary_serialization/update_stream_serializer_test.go
index d57566cb..ab0f2230 100644
--- a/foreign/go/binary_serialization/update_stream_serializer_test.go
+++ b/foreign/go/binary_serialization/update_stream_serializer_test.go
@@ -24,8 +24,9 @@ import (
)
func TestSerialize_UpdateStream(t *testing.T) {
+ streamId, _ := iggcon.NewIdentifier("stream")
request := TcpUpdateStreamRequest{
- StreamId: iggcon.NewIdentifier("stream"),
+ StreamId: streamId,
Name: "update_stream",
}
diff --git a/foreign/go/binary_serialization/update_topic_serializer_test.go
b/foreign/go/binary_serialization/update_topic_serializer_test.go
index 355cea07..3c4e14f2 100644
--- a/foreign/go/binary_serialization/update_topic_serializer_test.go
+++ b/foreign/go/binary_serialization/update_topic_serializer_test.go
@@ -24,9 +24,11 @@ import (
)
func TestSerialize_UpdateTopic(t *testing.T) {
+ streamId, _ := iggcon.NewIdentifier("stream")
+ topicId, _ := iggcon.NewIdentifier(uint32(1))
request := TcpUpdateTopicRequest{
- StreamId: iggcon.NewIdentifier("stream"),
- TopicId: iggcon.NewIdentifier(1),
+ StreamId: streamId,
+ TopicId: topicId,
Name: "update_topic",
MessageExpiry: 100000,
MaxTopicSize: 100,
diff --git a/foreign/go/contracts/consumer_groups.go
b/foreign/go/contracts/consumer_groups.go
index f8b6765e..7178703b 100644
--- a/foreign/go/contracts/consumer_groups.go
+++ b/foreign/go/contracts/consumer_groups.go
@@ -18,10 +18,10 @@
package iggcon
type ConsumerGroup struct {
- Id int `json:"id"`
+ Id uint32 `json:"id"`
Name string `json:"name"`
- PartitionsCount int `json:"partitionsCount"`
- MembersCount int `json:"membersCount"`
+ PartitionsCount uint32 `json:"partitionsCount"`
+ MembersCount uint32 `json:"membersCount"`
}
type ConsumerGroupDetails struct {
@@ -30,9 +30,9 @@ type ConsumerGroupDetails struct {
}
type ConsumerGroupMember struct {
- ID int
- PartitionsCount int
- Partitions []int
+ ID uint32
+ PartitionsCount uint32
+ Partitions []uint32
}
type CreateConsumerGroupRequest struct {
@@ -61,7 +61,7 @@ type LeaveConsumerGroupRequest struct {
}
type ConsumerGroupInfo struct {
- StreamId int `json:"streamId"`
- TopicId int `json:"topicId"`
- ConsumerGroupId int `json:"consumerGroupId"`
+ StreamId uint32 `json:"streamId"`
+ TopicId uint32 `json:"topicId"`
+ ConsumerGroupId uint32 `json:"consumerGroupId"`
}
diff --git a/foreign/go/contracts/identifier.go
b/foreign/go/contracts/identifier.go
index c9fcc6d6..10c138a7 100644
--- a/foreign/go/contracts/identifier.go
+++ b/foreign/go/contracts/identifier.go
@@ -17,35 +17,77 @@
package iggcon
+import (
+ "encoding/binary"
+ ierror "github.com/apache/iggy/foreign/go/errors"
+)
+
type Identifier struct {
Kind IdKind
Length int
- Value any
+ Value []byte
}
-type IdKind int
+type IdKind uint8
const (
NumericId IdKind = 1
StringId IdKind = 2
)
-func NewIdentifier(id any) Identifier {
- var kind IdKind
- var length int
-
- switch v := id.(type) {
- case int:
- kind = NumericId
- length = 4
+// NewIdentifier create a new identifier
+func NewIdentifier[T uint32 | string](value T) (Identifier, error) {
+ switch v := any(value).(type) {
+ case uint32:
+ return newNumericIdentifier(v)
case string:
- kind = StringId
- length = len(v)
+ return newStringIdentifier(v)
}
+ return Identifier{}, ierror.InvalidIdentifier
+}
+// newNumericIdentifier creates a new identifier from the given numeric value.
+func newNumericIdentifier(value uint32) (Identifier, error) {
+ if value == 0 {
+ return Identifier{}, ierror.InvalidIdentifier
+ }
+
+ val := make([]byte, 4)
+ binary.LittleEndian.PutUint32(val, value)
return Identifier{
- Kind: kind,
- Length: length,
- Value: id,
+ Kind: NumericId,
+ Length: 4,
+ Value: val,
+ }, nil
+}
+
+// NewStringIdentifier creates a new identifier from the given string value.
+func newStringIdentifier(value string) (Identifier, error) {
+ length := len(value)
+ if length == 0 || length > 255 {
+ return Identifier{}, ierror.InvalidIdentifier
}
+ return Identifier{
+ Kind: StringId,
+ Length: len(value),
+ Value: []byte(value),
+ }, nil
+}
+
+// Uint32 returns the numeric value of the identifier.
+func (id Identifier) Uint32() (uint32, error) {
+ if id.Kind != NumericId || id.Length != 4 {
+ return 0, ierror.ResourceNotFound
+ }
+
+ return binary.LittleEndian.Uint32(id.Value), nil
+}
+
+// String returns the string value of the identifier.
+func (id Identifier) String() (string, error) {
+ if id.Kind != StringId {
+ return "", ierror.InvalidIdentifier
+ }
+
+ return string(id.Value), nil
}
diff --git a/foreign/go/contracts/messages.go b/foreign/go/contracts/messages.go
index b20e9b11..a9e74d42 100644
--- a/foreign/go/contracts/messages.go
+++ b/foreign/go/contracts/messages.go
@@ -49,7 +49,7 @@ type PollMessageRequest struct {
StreamId Identifier `json:"streamId"`
TopicId Identifier `json:"topicId"`
Consumer Consumer `json:"consumer"`
- PartitionId int `json:"partitionId"`
+ PartitionId uint32 `json:"partitionId"`
PollingStrategy PollingStrategy `json:"pollingStrategy"`
Count int `json:"count"`
AutoCommit bool `json:"autoCommit"`
diff --git a/foreign/go/contracts/offets.go b/foreign/go/contracts/offets.go
index 2522df88..7c4a1213 100644
--- a/foreign/go/contracts/offets.go
+++ b/foreign/go/contracts/offets.go
@@ -33,7 +33,7 @@ type GetConsumerOffsetRequest struct {
}
type ConsumerOffsetInfo struct {
- PartitionId int `json:"partitionId"`
+ PartitionId uint32 `json:"partitionId"`
CurrentOffset uint64 `json:"currentOffset"`
StoredOffset uint64 `json:"storedOffset"`
}
diff --git a/foreign/go/contracts/partitions.go
b/foreign/go/contracts/partitions.go
index 6608211b..5b74effc 100644
--- a/foreign/go/contracts/partitions.go
+++ b/foreign/go/contracts/partitions.go
@@ -25,10 +25,10 @@ import (
)
type PartitionContract struct {
- Id int `json:"id"`
+ Id uint32 `json:"id"`
MessagesCount uint64 `json:"messagesCount"`
CreatedAt uint64 `json:"createdAt"`
- SegmentsCount int `json:"segmentsCount"`
+ SegmentsCount uint32 `json:"segmentsCount"`
CurrentOffset uint64 `json:"currentOffset"`
SizeBytes uint64 `json:"sizeBytes"`
}
diff --git a/foreign/go/contracts/stats.go b/foreign/go/contracts/stats.go
index 457fa255..1409b01e 100644
--- a/foreign/go/contracts/stats.go
+++ b/foreign/go/contracts/stats.go
@@ -18,7 +18,7 @@
package iggcon
type Stats struct {
- ProcessId int `json:"process_id"`
+ ProcessId uint32 `json:"process_id"`
CpuUsage float32 `json:"cpu_usage"`
TotalCpuUsage float32 `json:"total_cpu_usage"`
MemoryUsage uint64 `json:"memory_usage"`
@@ -29,13 +29,13 @@ type Stats struct {
ReadBytes uint64 `json:"read_bytes"`
WrittenBytes uint64 `json:"written_bytes"`
MessagesSizeBytes uint64 `json:"messages_size_bytes"`
- StreamsCount int `json:"streams_count"`
- TopicsCount int `json:"topics_count"`
- PartitionsCount int `json:"partitions_count"`
- SegmentsCount int `json:"segments_count"`
+ StreamsCount uint32 `json:"streams_count"`
+ TopicsCount uint32 `json:"topics_count"`
+ PartitionsCount uint32 `json:"partitions_count"`
+ SegmentsCount uint32 `json:"segments_count"`
MessagesCount uint64 `json:"messages_count"`
- ClientsCount int `json:"clients_count"`
- ConsumerGroupsCount int `json:"consumer_groups_count"`
+ ClientsCount uint32 `json:"clients_count"`
+ ConsumerGroupsCount uint32 `json:"consumer_groups_count"`
Hostname string `json:"hostname"`
OsName string `json:"os_name"`
OsVersion string `json:"os_version"`
diff --git a/foreign/go/contracts/stream.go b/foreign/go/contracts/stream.go
index 8f760ebd..523d5d7e 100644
--- a/foreign/go/contracts/stream.go
+++ b/foreign/go/contracts/stream.go
@@ -23,12 +23,12 @@ type CreateStreamRequest struct {
}
type Stream struct {
- Id int `json:"id"`
+ Id uint32 `json:"id"`
Name string `json:"name"`
SizeBytes uint64 `json:"sizeBytes"`
CreatedAt uint64 `json:"createdAt"`
MessagesCount uint64 `json:"messagesCount"`
- TopicsCount int `json:"topicsCount"`
+ TopicsCount uint32 `json:"topicsCount"`
}
type StreamDetails struct {
diff --git a/foreign/go/contracts/topics.go b/foreign/go/contracts/topics.go
index 504a9e59..23305a78 100644
--- a/foreign/go/contracts/topics.go
+++ b/foreign/go/contracts/topics.go
@@ -21,7 +21,7 @@ import "time"
type CreateTopicRequest struct {
StreamId Identifier `json:"streamId"`
- TopicId int `json:"topicId"`
+ TopicId uint32 `json:"topicId"`
PartitionsCount int `json:"partitionsCount"`
CompressionAlgorithm uint8 `json:"compressionAlgorithm"`
MessageExpiry time.Duration `json:"messageExpiry"`
@@ -41,8 +41,8 @@ type UpdateTopicRequest struct {
}
type Topic struct {
- Id int `json:"id"`
- CreatedAt int `json:"createdAt"`
+ Id uint32 `json:"id"`
+ CreatedAt uint64 `json:"createdAt"`
Name string `json:"name"`
SizeBytes uint64 `json:"sizeBytes"`
MessageExpiry time.Duration `json:"messageExpiry"`
@@ -50,7 +50,7 @@ type Topic struct {
MaxTopicSize uint64 `json:"maxTopicSize"`
ReplicationFactor uint8 `json:"replicationFactor"`
MessagesCount uint64 `json:"messagesCount"`
- PartitionsCount int `json:"partitionsCount"`
+ PartitionsCount uint32 `json:"partitionsCount"`
}
type TopicDetails struct {
diff --git a/foreign/go/errors/constants.go b/foreign/go/errors/constants.go
index 62cb0486..ad5d9ebf 100644
--- a/foreign/go/errors/constants.go
+++ b/foreign/go/errors/constants.go
@@ -26,6 +26,10 @@ var (
Code: 2,
Message: "invalid_configuration",
}
+ InvalidIdentifier = &IggyError{
+ Code: 6,
+ Message: "invalid_identifier",
+ }
StreamIdNotFound = &IggyError{
Code: 1009,
Message: "stream_id_not_found",
diff --git a/foreign/go/iggycli/client.go b/foreign/go/iggycli/client.go
index e8f5fa91..6befa52a 100644
--- a/foreign/go/iggycli/client.go
+++ b/foreign/go/iggycli/client.go
@@ -62,7 +62,7 @@ type Client interface {
messageExpiry time.Duration,
maxTopicSize uint64,
replicationFactor *uint8,
- topicId *int,
+ topicId *uint32,
) (*iggcon.TopicDetails, error)
// UpdateTopic update a topic by unique ID or name.
@@ -256,5 +256,5 @@ type Client interface {
// GetClient get the info about a specific client by unique ID (not to
be confused with the user).
// Authentication is required, and the permission to read the server
info.
- GetClient(clientId int) (*iggcon.ClientInfoDetails, error)
+ GetClient(clientId uint32) (*iggcon.ClientInfoDetails, error)
}
diff --git a/foreign/go/samples/consumer/consumer.go
b/foreign/go/samples/consumer/consumer.go
index 35f9ff2f..b548da69 100644
--- a/foreign/go/samples/consumer/consumer.go
+++ b/foreign/go/samples/consumer/consumer.go
@@ -31,11 +31,11 @@ import (
// config
const (
- DefaultStreamId = 1
- TopicId = 1
+ DefaultStreamId = uint32(1)
+ TopicId = uint32(1)
Partition = 1
Interval = 1000
- ConsumerId = 1
+ ConsumerId = uint32(1)
)
func main() {
@@ -62,8 +62,9 @@ func main() {
}
func EnsureInfrastructureIsInitialized(cli iggycli.Client) error {
- if _, streamErr :=
cli.GetStream(iggcon.NewIdentifier(DefaultStreamId)); streamErr != nil {
- uint32DefaultStreamId := uint32(DefaultStreamId)
+ streamIdentifier, _ := iggcon.NewIdentifier(DefaultStreamId)
+ if _, streamErr := cli.GetStream(streamIdentifier); streamErr != nil {
+ uint32DefaultStreamId := DefaultStreamId
_, streamErr = cli.CreateStream("Test Producer Stream",
&uint32DefaultStreamId)
if streamErr != nil {
@@ -75,10 +76,11 @@ func EnsureInfrastructureIsInitialized(cli iggycli.Client)
error {
fmt.Printf("Stream with ID: %d exists.\n", DefaultStreamId)
- if _, topicErr := cli.GetTopic(iggcon.NewIdentifier(DefaultStreamId),
iggcon.NewIdentifier(TopicId)); topicErr != nil {
+ topicIdentifier, _ := iggcon.NewIdentifier(TopicId)
+ if _, topicErr := cli.GetTopic(streamIdentifier, topicIdentifier);
topicErr != nil {
uint32TopicId := TopicId
_, topicErr = cli.CreateTopic(
- iggcon.NewIdentifier(DefaultStreamId),
+ streamIdentifier,
"Test Topic From Producer Sample",
12,
0,
@@ -103,11 +105,17 @@ func ConsumeMessages(cli iggycli.Client) error {
fmt.Printf("Messages will be polled from stream '%d', topic '%d',
partition '%d' with interval %d ms.\n", DefaultStreamId, TopicId, Partition,
Interval)
for {
+ streamIdentifier, _ := iggcon.NewIdentifier(DefaultStreamId)
+ topicIdentifier, _ := iggcon.NewIdentifier(TopicId)
+ consumerIdentifier, _ := iggcon.NewIdentifier(ConsumerId)
partionId := uint32(Partition)
messagesWrapper, err := cli.PollMessages(
- iggcon.NewIdentifier(DefaultStreamId),
- iggcon.NewIdentifier(TopicId),
- iggcon.Consumer{Kind: iggcon.ConsumerKindSingle, Id:
iggcon.NewIdentifier(ConsumerId)},
+ streamIdentifier,
+ topicIdentifier,
+ iggcon.Consumer{
+ Kind: iggcon.ConsumerKindSingle,
+ Id: consumerIdentifier,
+ },
iggcon.NextPollingStrategy(),
1,
true,
diff --git a/foreign/go/samples/producer/producer.go
b/foreign/go/samples/producer/producer.go
index e4d9774b..068f1316 100644
--- a/foreign/go/samples/producer/producer.go
+++ b/foreign/go/samples/producer/producer.go
@@ -30,8 +30,8 @@ import (
)
const (
- StreamId = 1
- TopicId = 1
+ StreamId = uint32(1)
+ TopicId = uint32(1)
MessageBatchCount = 1
Partition = 1
Interval = 1000
@@ -61,7 +61,8 @@ func main() {
}
func EnsureInfrastructureIsInitialized(cli iggycli.Client) error {
- if _, streamErr := cli.GetStream(iggcon.NewIdentifier(StreamId));
streamErr != nil {
+ streamIdentifier, _ := iggcon.NewIdentifier(StreamId)
+ if _, streamErr := cli.GetStream(streamIdentifier); streamErr != nil {
uint32StreamId := uint32(StreamId)
_, streamErr = cli.CreateStream("Test Producer Stream",
&uint32StreamId)
@@ -76,10 +77,11 @@ func EnsureInfrastructureIsInitialized(cli iggycli.Client)
error {
fmt.Printf("Stream with ID: %d exists.\n", StreamId)
- if _, topicErr := cli.GetTopic(iggcon.NewIdentifier(StreamId),
iggcon.NewIdentifier(TopicId)); topicErr != nil {
+ topicIdentifier, _ := iggcon.NewIdentifier(TopicId)
+ if _, topicErr := cli.GetTopic(streamIdentifier, topicIdentifier);
topicErr != nil {
refStreamId := StreamId
_, topicErr = cli.CreateTopic(
- iggcon.NewIdentifier(StreamId),
+ streamIdentifier,
"Test Topic From Producer Sample",
12,
0,
@@ -124,9 +126,11 @@ func PublishMessages(messageStream iggycli.Client) error {
})
}
+ streamIdentifier, _ := iggcon.NewIdentifier(StreamId)
+ topicIdentifier, _ := iggcon.NewIdentifier(TopicId)
err := messageStream.SendMessages(
- iggcon.NewIdentifier(StreamId),
- iggcon.NewIdentifier(TopicId),
+ streamIdentifier,
+ topicIdentifier,
iggcon.PartitionId(Partition),
messages,
)
diff --git a/foreign/go/tcp/tcp_clients_managament.go
b/foreign/go/tcp/tcp_clients_managament.go
index 922ea066..cf30e7b3 100644
--- a/foreign/go/tcp/tcp_clients_managament.go
+++ b/foreign/go/tcp/tcp_clients_managament.go
@@ -31,8 +31,8 @@ func (tms *IggyTcpClient) GetClients() ([]iggcon.ClientInfo,
error) {
return binaryserialization.DeserializeClients(buffer)
}
-func (tms *IggyTcpClient) GetClient(clientId int) (*iggcon.ClientInfoDetails,
error) {
- message := binaryserialization.SerializeInt(clientId)
+func (tms *IggyTcpClient) GetClient(clientId uint32)
(*iggcon.ClientInfoDetails, error) {
+ message := binaryserialization.SerializeUint32(clientId)
buffer, err := tms.sendAndFetchResponse(message, iggcon.GetClientCode)
if err != nil {
return nil, err
diff --git a/foreign/go/tcp/tcp_topic_managament.go
b/foreign/go/tcp/tcp_topic_managament.go
index 20a08bfb..ebefabee 100644
--- a/foreign/go/tcp/tcp_topic_managament.go
+++ b/foreign/go/tcp/tcp_topic_managament.go
@@ -61,7 +61,7 @@ func (tms *IggyTcpClient) CreateTopic(
messageExpiry time.Duration,
maxTopicSize uint64,
replicationFactor *uint8,
- topicId *int,
+ topicId *uint32,
) (*iggcon.TopicDetails, error) {
if MaxStringLength < len(name) {
return nil, ierror.TextTooLong("topic_name")