This is an automated email from the ASF dual-hosted git repository. piotr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push: new 8f59d91a feat(go): add DeleteConsumerOffset function. (#2147) 8f59d91a is described below commit 8f59d91aa7551800131c7f0739046ceb497f52ba Author: Chengxi <chengxi.luo2...@gmail.com> AuthorDate: Thu Sep 4 16:02:47 2025 -0400 feat(go): add DeleteConsumerOffset function. (#2147) --- bdd/go/tests/tcp_test/offset_feature_delete.go | 155 +++++++++++++++++++++ .../binary_request_serializer.go | 17 +++ .../consumer_serializer.go} | 27 ++-- foreign/go/contracts/command_codes.go | 1 + foreign/go/contracts/consumer.go | 2 +- foreign/go/contracts/offets.go | 7 + foreign/go/errors/constants.go | 4 + foreign/go/errors/errors.go | 2 + foreign/go/iggycli/client.go | 9 ++ foreign/go/tcp/tcp_offset_managament.go | 11 ++ 10 files changed, 215 insertions(+), 20 deletions(-) diff --git a/bdd/go/tests/tcp_test/offset_feature_delete.go b/bdd/go/tests/tcp_test/offset_feature_delete.go new file mode 100644 index 00000000..2b775e98 --- /dev/null +++ b/bdd/go/tests/tcp_test/offset_feature_delete.go @@ -0,0 +1,155 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package tcp_test + +import ( + iggcon "github.com/apache/iggy/foreign/go/contracts" + ierror "github.com/apache/iggy/foreign/go/errors" + "github.com/onsi/ginkgo/v2" +) + +var _ = ginkgo.Describe("DELETE CONSUMER OFFSET:", func() { + prefix := "DeleteConsumerOffset" + ginkgo.When("User is logged in", func() { + ginkgo.Context("and deletes an existing consumer offset after storing it", func() { + client := createAuthorizedConnection() + streamId, _ := successfullyCreateStream(prefix, client) + defer deleteStreamAfterTests(streamId, client) + topicId, _ := successfullyCreateTopic(streamId, client) + groupId, _ := successfullyCreateConsumer(streamId, topicId, client) + + streamIdentifier, _ := iggcon.NewIdentifier(streamId) + topicIdentifier, _ := iggcon.NewIdentifier(topicId) + groupIdentifier, _ := iggcon.NewIdentifier(groupId) + + partitionId := uint32(1) + testOffset := uint64(1) + consumer := iggcon.NewSingleConsumer(groupIdentifier) + + // send test messages + messages := createDefaultMessages() + err := client.SendMessages( + streamIdentifier, + topicIdentifier, + iggcon.PartitionId(partitionId), + messages, + ) + itShouldNotReturnError(err) + + // store consumer offset + err = client.StoreConsumerOffset( + consumer, + streamIdentifier, + topicIdentifier, + testOffset, + &partitionId, + ) + itShouldNotReturnError(err) + + // verify offset was stored + storedOffset, err := client.GetConsumerOffset( + consumer, streamIdentifier, topicIdentifier, &partitionId, + ) + itShouldNotReturnError(err) + itShouldReturnStoredConsumerOffset(storedOffset, partitionId, testOffset) + + // delete the offset + err = client.DeleteConsumerOffset( + consumer, streamIdentifier, topicIdentifier, &partitionId, + ) + itShouldNotReturnError(err) + + // verify offset was deleted + storedOffset, err = client.GetConsumerOffset( + consumer, streamIdentifier, topicIdentifier, &partitionId, + ) + itShouldNotReturnError(err) + itShouldReturnNilOffsetForNewConsumerGroup(storedOffset) + }) + + ginkgo.Context("and attempts to delete a non-existing consumer offset", func() { + client := createAuthorizedConnection() + streamId, _ := successfullyCreateStream(prefix, client) + defer deleteStreamAfterTests(streamId, client) + topicId, _ := successfullyCreateTopic(streamId, client) + groupId, _ := successfullyCreateConsumer(streamId, topicId, client) + + streamIdentifier, _ := iggcon.NewIdentifier(streamId) + topicIdentifier, _ := iggcon.NewIdentifier(topicId) + groupIdentifier, _ := iggcon.NewIdentifier(groupId) + + partitionId := uint32(1) + consumer := iggcon.NewSingleConsumer(groupIdentifier) + + err := client.DeleteConsumerOffset( + consumer, + streamIdentifier, + topicIdentifier, + &partitionId, + ) + itShouldReturnSpecificIggyError(err, ierror.ConsumerOffsetNotFound) + }) + + ginkgo.Context("and attempts to delete an offset from a non-existing consumer group", func() { + client := createAuthorizedConnection() + streamId, _ := successfullyCreateStream(prefix, client) + defer deleteStreamAfterTests(streamId, client) + topicId, _ := successfullyCreateTopic(streamId, client) + + streamIdentifier, _ := iggcon.NewIdentifier(streamId) + topicIdentifier, _ := iggcon.NewIdentifier(topicId) + consumer := iggcon.NewGroupConsumer(randomU32Identifier()) + partitionId := uint32(1) + + err := client.DeleteConsumerOffset( + consumer, + streamIdentifier, + topicIdentifier, + &partitionId, + ) + + itShouldReturnSpecificIggyError(err, ierror.ConsumerGroupIdNotFound) + }) + + ginkgo.Context("and attempts to delete an offset from a non-existing stream", func() { + client := createAuthorizedConnection() + consumer := iggcon.NewGroupConsumer(randomU32Identifier()) + partitionId := uint32(1) + + err := client.DeleteConsumerOffset( + consumer, + randomU32Identifier(), + randomU32Identifier(), + &partitionId) + itShouldReturnSpecificIggyError(err, ierror.StreamIdNotFound) + }) + }) + + ginkgo.When("User is not logged in", func() { + ginkgo.Context("and attempts to delete a consumer offset", func() { + client := createClient() + consumer := iggcon.NewGroupConsumer(randomU32Identifier()) + partitionId := uint32(1) + + err := client.DeleteConsumerOffset( + consumer, randomU32Identifier(), randomU32Identifier(), &partitionId, + ) + itShouldReturnUnauthenticatedError(err) + }) + }) +}) diff --git a/foreign/go/binary_serialization/binary_request_serializer.go b/foreign/go/binary_serialization/binary_request_serializer.go index d69e3b13..7ec4d2c0 100644 --- a/foreign/go/binary_serialization/binary_request_serializer.go +++ b/foreign/go/binary_serialization/binary_request_serializer.go @@ -62,6 +62,23 @@ func GetOffset(request iggcon.GetConsumerOffsetRequest) []byte { return bytes } +func DeleteOffset(request iggcon.DeleteConsumerOffsetRequest) []byte { + consumerBytes := SerializeConsumer(request.Consumer) + streamIdBytes := SerializeIdentifier(request.StreamId) + topicIdBytes := SerializeIdentifier(request.TopicId) + bytes := make([]byte, 0, 12+len(consumerBytes)+len(streamIdBytes)+len(topicIdBytes)) + bytes = append(bytes, consumerBytes...) + bytes = append(bytes, streamIdBytes...) + bytes = append(bytes, topicIdBytes...) + if request.PartitionId != nil { + bytes = binary.LittleEndian.AppendUint32(bytes, *request.PartitionId) + } else { + bytes = binary.LittleEndian.AppendUint32(bytes, 0) + } + + return bytes +} + func CreatePartitions(request iggcon.CreatePartitionsRequest) []byte { bytes := make([]byte, 8+request.StreamId.Length+request.TopicId.Length) position := 4 + request.StreamId.Length + request.TopicId.Length diff --git a/foreign/go/contracts/offets.go b/foreign/go/binary_serialization/consumer_serializer.go similarity index 55% copy from foreign/go/contracts/offets.go copy to foreign/go/binary_serialization/consumer_serializer.go index 7c4a1213..a359ed51 100644 --- a/foreign/go/contracts/offets.go +++ b/foreign/go/binary_serialization/consumer_serializer.go @@ -15,25 +15,14 @@ // specific language governing permissions and limitations // under the License. -package iggcon +package binaryserialization -type StoreConsumerOffsetRequest struct { - StreamId Identifier `json:"streamId"` - TopicId Identifier `json:"topicId"` - Consumer Consumer `json:"consumer"` - PartitionId *uint32 `json:"partitionId"` - Offset uint64 `json:"offset"` -} - -type GetConsumerOffsetRequest struct { - StreamId Identifier `json:"streamId"` - TopicId Identifier `json:"topicId"` - Consumer Consumer `json:"consumer"` - PartitionId *uint32 `json:"partitionId"` -} +import iggcon "github.com/apache/iggy/foreign/go/contracts" -type ConsumerOffsetInfo struct { - PartitionId uint32 `json:"partitionId"` - CurrentOffset uint64 `json:"currentOffset"` - StoredOffset uint64 `json:"storedOffset"` +func SerializeConsumer(consumer iggcon.Consumer) []byte { + idBytes := SerializeIdentifier(consumer.Id) + bytes := make([]byte, 0, 1+len(idBytes)) + bytes = append(bytes, uint8(consumer.Kind)) + bytes = append(bytes, idBytes...) + return bytes } diff --git a/foreign/go/contracts/command_codes.go b/foreign/go/contracts/command_codes.go index 53001c08..f385d826 100644 --- a/foreign/go/contracts/command_codes.go +++ b/foreign/go/contracts/command_codes.go @@ -42,6 +42,7 @@ const ( SendMessagesCode CommandCode = 101 GetOffsetCode CommandCode = 120 StoreOffsetCode CommandCode = 121 + DeleteConsumerOffsetCode CommandCode = 122 GetStreamCode CommandCode = 200 GetStreamsCode CommandCode = 201 CreateStreamCode CommandCode = 202 diff --git a/foreign/go/contracts/consumer.go b/foreign/go/contracts/consumer.go index 72b204ec..9ca9ea12 100644 --- a/foreign/go/contracts/consumer.go +++ b/foreign/go/contracts/consumer.go @@ -17,7 +17,7 @@ package iggcon -type ConsumerKind int +type ConsumerKind uint8 const ( ConsumerKindSingle ConsumerKind = 1 diff --git a/foreign/go/contracts/offets.go b/foreign/go/contracts/offets.go index 7c4a1213..d85c25f4 100644 --- a/foreign/go/contracts/offets.go +++ b/foreign/go/contracts/offets.go @@ -37,3 +37,10 @@ type ConsumerOffsetInfo struct { CurrentOffset uint64 `json:"currentOffset"` StoredOffset uint64 `json:"storedOffset"` } + +type DeleteConsumerOffsetRequest struct { + Consumer Consumer + StreamId Identifier + TopicId Identifier + PartitionId *uint32 +} diff --git a/foreign/go/errors/constants.go b/foreign/go/errors/constants.go index ad5d9ebf..c0294cc9 100644 --- a/foreign/go/errors/constants.go +++ b/foreign/go/errors/constants.go @@ -38,6 +38,10 @@ var ( Code: 2010, Message: "topic_id_not_found", } + ConsumerOffsetNotFound = &IggyError{ + Code: 3021, + Message: "consumer_offset_not_found", + } InvalidMessagesCount = &IggyError{ Code: 4009, Message: "invalid_messages_count", diff --git a/foreign/go/errors/errors.go b/foreign/go/errors/errors.go index 0c45beaf..737f76f9 100644 --- a/foreign/go/errors/errors.go +++ b/foreign/go/errors/errors.go @@ -219,6 +219,8 @@ func TranslateErrorCode(code int) string { return "partition_not_found" case 3008: return "no_partitions" + case 3021: + return "consumer_offset_not_found" case 4000: return "segment_not_found" case 4001: diff --git a/foreign/go/iggycli/client.go b/foreign/go/iggycli/client.go index cc0cc191..a5c2720e 100644 --- a/foreign/go/iggycli/client.go +++ b/foreign/go/iggycli/client.go @@ -123,6 +123,15 @@ type Client interface { // Authentication is required, and the permission to read the streams or topics. GetConsumerGroups(streamId iggcon.Identifier, topicId iggcon.Identifier) ([]iggcon.ConsumerGroup, error) + // DeleteConsumerOffset delete the consumer offset for a specific consumer or consumer group for the given stream and topic by unique IDs or names. + // Authentication is required, and the permission to poll the messages. + DeleteConsumerOffset( + consumer iggcon.Consumer, + streamId iggcon.Identifier, + topicId iggcon.Identifier, + partitionId *uint32, + ) error + // GetConsumerGroup get the info about a specific consumer group by unique ID or name for the given stream and topic by unique IDs or names. // Authentication is required, and the permission to read the streams or topics. GetConsumerGroup( diff --git a/foreign/go/tcp/tcp_offset_managament.go b/foreign/go/tcp/tcp_offset_managament.go index 071dd32f..689e5a60 100644 --- a/foreign/go/tcp/tcp_offset_managament.go +++ b/foreign/go/tcp/tcp_offset_managament.go @@ -48,3 +48,14 @@ func (tms *IggyTcpClient) StoreConsumerOffset(consumer iggcon.Consumer, streamId _, err := tms.sendAndFetchResponse(message, iggcon.StoreOffsetCode) return err } + +func (tms *IggyTcpClient) DeleteConsumerOffset(consumer iggcon.Consumer, streamId iggcon.Identifier, topicId iggcon.Identifier, partitionId *uint32) error { + message := binaryserialization.DeleteOffset(iggcon.DeleteConsumerOffsetRequest{ + Consumer: consumer, + StreamId: streamId, + TopicId: topicId, + PartitionId: partitionId, + }) + _, err := tms.sendAndFetchResponse(message, iggcon.DeleteConsumerOffsetCode) + return err +}