This is an automated email from the ASF dual-hosted git repository.

piotr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new 8f59d91a feat(go): add DeleteConsumerOffset function. (#2147)
8f59d91a is described below

commit 8f59d91aa7551800131c7f0739046ceb497f52ba
Author: Chengxi <chengxi.luo2...@gmail.com>
AuthorDate: Thu Sep 4 16:02:47 2025 -0400

    feat(go): add DeleteConsumerOffset function. (#2147)
---
 bdd/go/tests/tcp_test/offset_feature_delete.go     | 155 +++++++++++++++++++++
 .../binary_request_serializer.go                   |  17 +++
 .../consumer_serializer.go}                        |  27 ++--
 foreign/go/contracts/command_codes.go              |   1 +
 foreign/go/contracts/consumer.go                   |   2 +-
 foreign/go/contracts/offets.go                     |   7 +
 foreign/go/errors/constants.go                     |   4 +
 foreign/go/errors/errors.go                        |   2 +
 foreign/go/iggycli/client.go                       |   9 ++
 foreign/go/tcp/tcp_offset_managament.go            |  11 ++
 10 files changed, 215 insertions(+), 20 deletions(-)

diff --git a/bdd/go/tests/tcp_test/offset_feature_delete.go 
b/bdd/go/tests/tcp_test/offset_feature_delete.go
new file mode 100644
index 00000000..2b775e98
--- /dev/null
+++ b/bdd/go/tests/tcp_test/offset_feature_delete.go
@@ -0,0 +1,155 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package tcp_test
+
+import (
+       iggcon "github.com/apache/iggy/foreign/go/contracts"
+       ierror "github.com/apache/iggy/foreign/go/errors"
+       "github.com/onsi/ginkgo/v2"
+)
+
+var _ = ginkgo.Describe("DELETE CONSUMER OFFSET:", func() {
+       prefix := "DeleteConsumerOffset"
+       ginkgo.When("User is logged in", func() {
+               ginkgo.Context("and deletes an existing consumer offset after 
storing it", func() {
+                       client := createAuthorizedConnection()
+                       streamId, _ := successfullyCreateStream(prefix, client)
+                       defer deleteStreamAfterTests(streamId, client)
+                       topicId, _ := successfullyCreateTopic(streamId, client)
+                       groupId, _ := successfullyCreateConsumer(streamId, 
topicId, client)
+
+                       streamIdentifier, _ := iggcon.NewIdentifier(streamId)
+                       topicIdentifier, _ := iggcon.NewIdentifier(topicId)
+                       groupIdentifier, _ := iggcon.NewIdentifier(groupId)
+
+                       partitionId := uint32(1)
+                       testOffset := uint64(1)
+                       consumer := iggcon.NewSingleConsumer(groupIdentifier)
+
+                       // send test messages
+                       messages := createDefaultMessages()
+                       err := client.SendMessages(
+                               streamIdentifier,
+                               topicIdentifier,
+                               iggcon.PartitionId(partitionId),
+                               messages,
+                       )
+                       itShouldNotReturnError(err)
+
+                       // store consumer offset
+                       err = client.StoreConsumerOffset(
+                               consumer,
+                               streamIdentifier,
+                               topicIdentifier,
+                               testOffset,
+                               &partitionId,
+                       )
+                       itShouldNotReturnError(err)
+
+                       // verify offset was stored
+                       storedOffset, err := client.GetConsumerOffset(
+                               consumer, streamIdentifier, topicIdentifier, 
&partitionId,
+                       )
+                       itShouldNotReturnError(err)
+                       itShouldReturnStoredConsumerOffset(storedOffset, 
partitionId, testOffset)
+
+                       // delete the offset
+                       err = client.DeleteConsumerOffset(
+                               consumer, streamIdentifier, topicIdentifier, 
&partitionId,
+                       )
+                       itShouldNotReturnError(err)
+
+                       // verify offset was deleted
+                       storedOffset, err = client.GetConsumerOffset(
+                               consumer, streamIdentifier, topicIdentifier, 
&partitionId,
+                       )
+                       itShouldNotReturnError(err)
+                       itShouldReturnNilOffsetForNewConsumerGroup(storedOffset)
+               })
+
+               ginkgo.Context("and attempts to delete a non-existing consumer 
offset", func() {
+                       client := createAuthorizedConnection()
+                       streamId, _ := successfullyCreateStream(prefix, client)
+                       defer deleteStreamAfterTests(streamId, client)
+                       topicId, _ := successfullyCreateTopic(streamId, client)
+                       groupId, _ := successfullyCreateConsumer(streamId, 
topicId, client)
+
+                       streamIdentifier, _ := iggcon.NewIdentifier(streamId)
+                       topicIdentifier, _ := iggcon.NewIdentifier(topicId)
+                       groupIdentifier, _ := iggcon.NewIdentifier(groupId)
+
+                       partitionId := uint32(1)
+                       consumer := iggcon.NewSingleConsumer(groupIdentifier)
+
+                       err := client.DeleteConsumerOffset(
+                               consumer,
+                               streamIdentifier,
+                               topicIdentifier,
+                               &partitionId,
+                       )
+                       itShouldReturnSpecificIggyError(err, 
ierror.ConsumerOffsetNotFound)
+               })
+
+               ginkgo.Context("and attempts to delete an offset from a 
non-existing consumer group", func() {
+                       client := createAuthorizedConnection()
+                       streamId, _ := successfullyCreateStream(prefix, client)
+                       defer deleteStreamAfterTests(streamId, client)
+                       topicId, _ := successfullyCreateTopic(streamId, client)
+
+                       streamIdentifier, _ := iggcon.NewIdentifier(streamId)
+                       topicIdentifier, _ := iggcon.NewIdentifier(topicId)
+                       consumer := 
iggcon.NewGroupConsumer(randomU32Identifier())
+                       partitionId := uint32(1)
+
+                       err := client.DeleteConsumerOffset(
+                               consumer,
+                               streamIdentifier,
+                               topicIdentifier,
+                               &partitionId,
+                       )
+
+                       itShouldReturnSpecificIggyError(err, 
ierror.ConsumerGroupIdNotFound)
+               })
+
+               ginkgo.Context("and attempts to delete an offset from a 
non-existing stream", func() {
+                       client := createAuthorizedConnection()
+                       consumer := 
iggcon.NewGroupConsumer(randomU32Identifier())
+                       partitionId := uint32(1)
+
+                       err := client.DeleteConsumerOffset(
+                               consumer,
+                               randomU32Identifier(),
+                               randomU32Identifier(),
+                               &partitionId)
+                       itShouldReturnSpecificIggyError(err, 
ierror.StreamIdNotFound)
+               })
+       })
+
+       ginkgo.When("User is not logged in", func() {
+               ginkgo.Context("and attempts to delete a consumer offset", 
func() {
+                       client := createClient()
+                       consumer := 
iggcon.NewGroupConsumer(randomU32Identifier())
+                       partitionId := uint32(1)
+
+                       err := client.DeleteConsumerOffset(
+                               consumer, randomU32Identifier(), 
randomU32Identifier(), &partitionId,
+                       )
+                       itShouldReturnUnauthenticatedError(err)
+               })
+       })
+})
diff --git a/foreign/go/binary_serialization/binary_request_serializer.go 
b/foreign/go/binary_serialization/binary_request_serializer.go
index d69e3b13..7ec4d2c0 100644
--- a/foreign/go/binary_serialization/binary_request_serializer.go
+++ b/foreign/go/binary_serialization/binary_request_serializer.go
@@ -62,6 +62,23 @@ func GetOffset(request iggcon.GetConsumerOffsetRequest) 
[]byte {
        return bytes
 }
 
+func DeleteOffset(request iggcon.DeleteConsumerOffsetRequest) []byte {
+       consumerBytes := SerializeConsumer(request.Consumer)
+       streamIdBytes := SerializeIdentifier(request.StreamId)
+       topicIdBytes := SerializeIdentifier(request.TopicId)
+       bytes := make([]byte, 0, 
12+len(consumerBytes)+len(streamIdBytes)+len(topicIdBytes))
+       bytes = append(bytes, consumerBytes...)
+       bytes = append(bytes, streamIdBytes...)
+       bytes = append(bytes, topicIdBytes...)
+       if request.PartitionId != nil {
+               bytes = binary.LittleEndian.AppendUint32(bytes, 
*request.PartitionId)
+       } else {
+               bytes = binary.LittleEndian.AppendUint32(bytes, 0)
+       }
+
+       return bytes
+}
+
 func CreatePartitions(request iggcon.CreatePartitionsRequest) []byte {
        bytes := make([]byte, 8+request.StreamId.Length+request.TopicId.Length)
        position := 4 + request.StreamId.Length + request.TopicId.Length
diff --git a/foreign/go/contracts/offets.go 
b/foreign/go/binary_serialization/consumer_serializer.go
similarity index 55%
copy from foreign/go/contracts/offets.go
copy to foreign/go/binary_serialization/consumer_serializer.go
index 7c4a1213..a359ed51 100644
--- a/foreign/go/contracts/offets.go
+++ b/foreign/go/binary_serialization/consumer_serializer.go
@@ -15,25 +15,14 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package iggcon
+package binaryserialization
 
-type StoreConsumerOffsetRequest struct {
-       StreamId    Identifier `json:"streamId"`
-       TopicId     Identifier `json:"topicId"`
-       Consumer    Consumer   `json:"consumer"`
-       PartitionId *uint32    `json:"partitionId"`
-       Offset      uint64     `json:"offset"`
-}
-
-type GetConsumerOffsetRequest struct {
-       StreamId    Identifier `json:"streamId"`
-       TopicId     Identifier `json:"topicId"`
-       Consumer    Consumer   `json:"consumer"`
-       PartitionId *uint32    `json:"partitionId"`
-}
+import iggcon "github.com/apache/iggy/foreign/go/contracts"
 
-type ConsumerOffsetInfo struct {
-       PartitionId   uint32 `json:"partitionId"`
-       CurrentOffset uint64 `json:"currentOffset"`
-       StoredOffset  uint64 `json:"storedOffset"`
+func SerializeConsumer(consumer iggcon.Consumer) []byte {
+       idBytes := SerializeIdentifier(consumer.Id)
+       bytes := make([]byte, 0, 1+len(idBytes))
+       bytes = append(bytes, uint8(consumer.Kind))
+       bytes = append(bytes, idBytes...)
+       return bytes
 }
diff --git a/foreign/go/contracts/command_codes.go 
b/foreign/go/contracts/command_codes.go
index 53001c08..f385d826 100644
--- a/foreign/go/contracts/command_codes.go
+++ b/foreign/go/contracts/command_codes.go
@@ -42,6 +42,7 @@ const (
        SendMessagesCode         CommandCode = 101
        GetOffsetCode            CommandCode = 120
        StoreOffsetCode          CommandCode = 121
+       DeleteConsumerOffsetCode CommandCode = 122
        GetStreamCode            CommandCode = 200
        GetStreamsCode           CommandCode = 201
        CreateStreamCode         CommandCode = 202
diff --git a/foreign/go/contracts/consumer.go b/foreign/go/contracts/consumer.go
index 72b204ec..9ca9ea12 100644
--- a/foreign/go/contracts/consumer.go
+++ b/foreign/go/contracts/consumer.go
@@ -17,7 +17,7 @@
 
 package iggcon
 
-type ConsumerKind int
+type ConsumerKind uint8
 
 const (
        ConsumerKindSingle ConsumerKind = 1
diff --git a/foreign/go/contracts/offets.go b/foreign/go/contracts/offets.go
index 7c4a1213..d85c25f4 100644
--- a/foreign/go/contracts/offets.go
+++ b/foreign/go/contracts/offets.go
@@ -37,3 +37,10 @@ type ConsumerOffsetInfo struct {
        CurrentOffset uint64 `json:"currentOffset"`
        StoredOffset  uint64 `json:"storedOffset"`
 }
+
+type DeleteConsumerOffsetRequest struct {
+       Consumer    Consumer
+       StreamId    Identifier
+       TopicId     Identifier
+       PartitionId *uint32
+}
diff --git a/foreign/go/errors/constants.go b/foreign/go/errors/constants.go
index ad5d9ebf..c0294cc9 100644
--- a/foreign/go/errors/constants.go
+++ b/foreign/go/errors/constants.go
@@ -38,6 +38,10 @@ var (
                Code:    2010,
                Message: "topic_id_not_found",
        }
+       ConsumerOffsetNotFound = &IggyError{
+               Code:    3021,
+               Message: "consumer_offset_not_found",
+       }
        InvalidMessagesCount = &IggyError{
                Code:    4009,
                Message: "invalid_messages_count",
diff --git a/foreign/go/errors/errors.go b/foreign/go/errors/errors.go
index 0c45beaf..737f76f9 100644
--- a/foreign/go/errors/errors.go
+++ b/foreign/go/errors/errors.go
@@ -219,6 +219,8 @@ func TranslateErrorCode(code int) string {
                return "partition_not_found"
        case 3008:
                return "no_partitions"
+       case 3021:
+               return "consumer_offset_not_found"
        case 4000:
                return "segment_not_found"
        case 4001:
diff --git a/foreign/go/iggycli/client.go b/foreign/go/iggycli/client.go
index cc0cc191..a5c2720e 100644
--- a/foreign/go/iggycli/client.go
+++ b/foreign/go/iggycli/client.go
@@ -123,6 +123,15 @@ type Client interface {
        // Authentication is required, and the permission to read the streams 
or topics.
        GetConsumerGroups(streamId iggcon.Identifier, topicId 
iggcon.Identifier) ([]iggcon.ConsumerGroup, error)
 
+       // DeleteConsumerOffset delete the consumer offset for a specific 
consumer or consumer group for the given stream and topic by unique IDs or 
names.
+       // Authentication is required, and the permission to poll the messages.
+       DeleteConsumerOffset(
+               consumer iggcon.Consumer,
+               streamId iggcon.Identifier,
+               topicId iggcon.Identifier,
+               partitionId *uint32,
+       ) error
+
        // GetConsumerGroup get the info about a specific consumer group by 
unique ID or name for the given stream and topic by unique IDs or names.
        // Authentication is required, and the permission to read the streams 
or topics.
        GetConsumerGroup(
diff --git a/foreign/go/tcp/tcp_offset_managament.go 
b/foreign/go/tcp/tcp_offset_managament.go
index 071dd32f..689e5a60 100644
--- a/foreign/go/tcp/tcp_offset_managament.go
+++ b/foreign/go/tcp/tcp_offset_managament.go
@@ -48,3 +48,14 @@ func (tms *IggyTcpClient) StoreConsumerOffset(consumer 
iggcon.Consumer, streamId
        _, err := tms.sendAndFetchResponse(message, iggcon.StoreOffsetCode)
        return err
 }
+
+func (tms *IggyTcpClient) DeleteConsumerOffset(consumer iggcon.Consumer, 
streamId iggcon.Identifier, topicId iggcon.Identifier, partitionId *uint32) 
error {
+       message := 
binaryserialization.DeleteOffset(iggcon.DeleteConsumerOffsetRequest{
+               Consumer:    consumer,
+               StreamId:    streamId,
+               TopicId:     topicId,
+               PartitionId: partitionId,
+       })
+       _, err := tms.sendAndFetchResponse(message, 
iggcon.DeleteConsumerOffsetCode)
+       return err
+}

Reply via email to