This is an automated email from the ASF dual-hosted git repository.

RongtongJin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new f038a534 [Go] Support offset option for lite topic subscriptions 
(#1285)
f038a534 is described below

commit f038a53433fa5f96fe2b3a9c584ab76482aa9dba
Author: Yim <[email protected]>
AuthorDate: Wed Jun 24 14:50:45 2026 +0800

    [Go] Support offset option for lite topic subscriptions (#1285)
    
    Add OffsetOption helpers and pass optional offset options through 
SyncLiteSubscriptionRequest when subscribing to lite topics.
---
 golang/lite_push_consumer.go      |  22 ++++++--
 golang/lite_push_consumer_mock.go |  13 +++--
 golang/lite_push_consumer_test.go |  69 ++++++++++++++++++++++-
 golang/offset_option.go           | 103 ++++++++++++++++++++++++++++++++++
 golang/offset_option_test.go      | 113 ++++++++++++++++++++++++++++++++++++++
 5 files changed, 309 insertions(+), 11 deletions(-)

diff --git a/golang/lite_push_consumer.go b/golang/lite_push_consumer.go
index c16229ca..63ab3287 100644
--- a/golang/lite_push_consumer.go
+++ b/golang/lite_push_consumer.go
@@ -30,7 +30,7 @@ import (
 
 type LitePushConsumer interface {
        PushConsumer
-       SubscribeLite(liteTopic string) error
+       SubscribeLite(liteTopic string, offsetOption ...OffsetOption) error
        UnSubscribeLite(liteTopic string) error
 }
 
@@ -107,11 +107,18 @@ func (lpc *defaultLitePushConsumer) 
notifyUnsubscribeLite(command *v2.NotifyUnsu
        lpc.litePushConsumerSettings.liteTopicSet.Delete(liteTopic)
 }
 
-func (lpc *defaultLitePushConsumer) SubscribeLite(liteTopic string) error {
+func (lpc *defaultLitePushConsumer) SubscribeLite(liteTopic string, 
offsetOption ...OffsetOption) error {
        if err := lpc.checkRunning(); err != nil {
                return err
        }
-       if err := lpc.syncLiteSubscription(context.TODO(), 
v2.LiteSubscriptionAction_PARTIAL_ADD, []string{liteTopic}); err != nil {
+       if len(offsetOption) > 1 {
+               return errors.New("only one offset option is supported")
+       }
+       var option *OffsetOption
+       if len(offsetOption) == 1 {
+               option = &offsetOption[0]
+       }
+       if err := lpc.syncLiteSubscription(context.TODO(), 
v2.LiteSubscriptionAction_PARTIAL_ADD, []string{liteTopic}, option); err != nil 
{
                sugarBaseLogger.Errorf("LitePushConsumer SubscribeLite 
liteTopic:%s err:%v", liteTopic, err)
                return err
        }
@@ -123,7 +130,7 @@ func (lpc *defaultLitePushConsumer) 
UnSubscribeLite(liteTopic string) error {
        if err := lpc.checkRunning(); err != nil {
                return err
        }
-       if err := lpc.syncLiteSubscription(context.TODO(), 
v2.LiteSubscriptionAction_PARTIAL_REMOVE, []string{liteTopic}); err != nil {
+       if err := lpc.syncLiteSubscription(context.TODO(), 
v2.LiteSubscriptionAction_PARTIAL_REMOVE, []string{liteTopic}, nil); err != nil 
{
                sugarBaseLogger.Errorf("LitePushConsumer UnSubscribeLite 
liteTopic:%s err:%v", liteTopic, err)
                return err
        }
@@ -151,12 +158,12 @@ func (lpc *defaultLitePushConsumer) 
syncAllLiteSubscription() {
        //if len(liteTopicSet) == 0 {
        //      return
        //}
-       if err := lpc.syncLiteSubscription(context.TODO(), 
v2.LiteSubscriptionAction_COMPLETE_ADD, liteTopicSet); err != nil {
+       if err := lpc.syncLiteSubscription(context.TODO(), 
v2.LiteSubscriptionAction_COMPLETE_ADD, liteTopicSet, nil); err != nil {
                sugarBaseLogger.Errorf("LitePushConsumer 
syncAllLiteSubscription:%v,  err:%v", liteTopicSet, err)
        }
 }
 
-func (lpc *defaultLitePushConsumer) syncLiteSubscription(context 
context.Context, action v2.LiteSubscriptionAction, diff []string) error {
+func (lpc *defaultLitePushConsumer) syncLiteSubscription(context 
context.Context, action v2.LiteSubscriptionAction, diff []string, offsetOption 
*OffsetOption) error {
        topic := lpc.litePushConsumerSettings.bindTopic
        group := lpc.litePushConsumerSettings.groupName
        clientId := lpc.litePushConsumerSettings.clientId
@@ -171,6 +178,9 @@ func (lpc *defaultLitePushConsumer) 
syncLiteSubscription(context context.Context
                Group:        group,
                LiteTopicSet: diff,
        }
+       if offsetOption != nil {
+               request.OffsetOption = offsetOption.toProtobuf()
+       }
 
        if action == v2.LiteSubscriptionAction_COMPLETE_ADD {
                sugarBaseLogger.Infof("syncLiteSubscription action:%s, 
topic:%s, group:%s, clientId:%s, liteTopicCount:%d",
diff --git a/golang/lite_push_consumer_mock.go 
b/golang/lite_push_consumer_mock.go
index ae15047a..bf75ecd8 100644
--- a/golang/lite_push_consumer_mock.go
+++ b/golang/lite_push_consumer_mock.go
@@ -34,17 +34,22 @@ func (m *MockLitePushConsumer) EXPECT() 
*MockLitePushConsumerMockRecorder {
 }
 
 // SubscribeLite mocks base method.
-func (m *MockLitePushConsumer) SubscribeLite(liteTopic string) error {
+func (m *MockLitePushConsumer) SubscribeLite(liteTopic string, offsetOption 
...OffsetOption) error {
        m.ctrl.T.Helper()
-       ret := m.ctrl.Call(m, "SubscribeLite", liteTopic)
+       varargs := []interface{}{liteTopic}
+       for _, a := range offsetOption {
+               varargs = append(varargs, a)
+       }
+       ret := m.ctrl.Call(m, "SubscribeLite", varargs...)
        ret0, _ := ret[0].(error)
        return ret0
 }
 
 // SubscribeLite indicates an expected call of SubscribeLite.
-func (mr *MockLitePushConsumerMockRecorder) SubscribeLite(liteTopic 
interface{}) *gomock.Call {
+func (mr *MockLitePushConsumerMockRecorder) SubscribeLite(liteTopic 
interface{}, offsetOption ...interface{}) *gomock.Call {
        mr.mock.ctrl.T.Helper()
-       return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SubscribeLite", 
reflect.TypeOf((*MockLitePushConsumer)(nil).SubscribeLite), liteTopic)
+       varargs := append([]interface{}{liteTopic}, offsetOption...)
+       return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SubscribeLite", 
reflect.TypeOf((*MockLitePushConsumer)(nil).SubscribeLite), varargs...)
 }
 
 // UnSubscribeLite mocks base method.
diff --git a/golang/lite_push_consumer_test.go 
b/golang/lite_push_consumer_test.go
index a084794d..a9018c56 100644
--- a/golang/lite_push_consumer_test.go
+++ b/golang/lite_push_consumer_test.go
@@ -206,6 +206,73 @@ func TestLitePushConsumer_SubscribeLite(t *testing.T) {
        }
 }
 
+func TestLitePushConsumer_SubscribeLite_WithOffset(t *testing.T) {
+       setupTest(t)
+       defer teardownTest()
+
+       dlpc, err := createTestLitePushConsumer(t)
+       if err != nil {
+               t.Fatalf("failed to create test lite push consumer: %v", err)
+       }
+
+       offsetOption, err := NewOffsetOptionWithOffset(100)
+       if err != nil {
+               t.Fatalf("failed to create offset option: %v", err)
+       }
+
+       mockRpcClient.EXPECT().SyncLiteSubscription(gomock.Any(), 
gomock.Any()).DoAndReturn(func(ctx context.Context, req 
*v2.SyncLiteSubscriptionRequest) (*v2.SyncLiteSubscriptionResponse, error) {
+               if req.GetAction() != v2.LiteSubscriptionAction_PARTIAL_ADD {
+                       t.Errorf("expected action PARTIAL_ADD, got %v", 
req.GetAction())
+               }
+               if len(req.GetLiteTopicSet()) != 1 || req.GetLiteTopicSet()[0] 
!= "lite-topic-1" {
+                       t.Errorf("expected lite topic set ['lite-topic-1'], got 
%v", req.GetLiteTopicSet())
+               }
+               if req.GetOffsetOption() == nil {
+                       t.Fatal("expected offset option to be set")
+               }
+               if _, ok := 
req.GetOffsetOption().GetOffsetType().(*v2.OffsetOption_Offset); !ok {
+                       t.Fatalf("expected offset option type OFFSET, got %T", 
req.GetOffsetOption().GetOffsetType())
+               }
+               if req.GetOffsetOption().GetOffset() != 100 {
+                       t.Errorf("expected offset 100, got %d", 
req.GetOffsetOption().GetOffset())
+               }
+               return setupSuccessResponse(), nil
+       }).Times(1)
+
+       err = dlpc.SubscribeLite("lite-topic-1", offsetOption)
+       if err != nil {
+               t.Fatalf("expected no error for SubscribeLite with offset, got 
%v", err)
+       }
+}
+
+func TestLitePushConsumer_SubscribeLite_WithLastOffset(t *testing.T) {
+       setupTest(t)
+       defer teardownTest()
+
+       dlpc, err := createTestLitePushConsumer(t)
+       if err != nil {
+               t.Fatalf("failed to create test lite push consumer: %v", err)
+       }
+
+       mockRpcClient.EXPECT().SyncLiteSubscription(gomock.Any(), 
gomock.Any()).DoAndReturn(func(ctx context.Context, req 
*v2.SyncLiteSubscriptionRequest) (*v2.SyncLiteSubscriptionResponse, error) {
+               if req.GetOffsetOption() == nil {
+                       t.Fatal("expected offset option to be set")
+               }
+               if _, ok := 
req.GetOffsetOption().GetOffsetType().(*v2.OffsetOption_Policy_); !ok {
+                       t.Fatalf("expected offset option type POLICY, got %T", 
req.GetOffsetOption().GetOffsetType())
+               }
+               if req.GetOffsetOption().GetPolicy() != v2.OffsetOption_LAST {
+                       t.Errorf("expected policy LAST, got %v", 
req.GetOffsetOption().GetPolicy())
+               }
+               return setupSuccessResponse(), nil
+       }).Times(1)
+
+       err = dlpc.SubscribeLite("lite-topic-1", LastOffset)
+       if err != nil {
+               t.Fatalf("expected no error for SubscribeLite with last offset, 
got %v", err)
+       }
+}
+
 func TestLitePushConsumer_SubscribeLite_NotRunning(t *testing.T) {
        setupTest(t)
        defer teardownTest()
@@ -335,7 +402,7 @@ func 
TestLitePushConsumer_syncLiteSubscription_StatusError(t *testing.T) {
 
        mockRpcClient.EXPECT().SyncLiteSubscription(gomock.Any(), 
gomock.Any()).Return(setupErrorResponse(v2.Code_INTERNAL_SERVER_ERROR, 
"internal error"), nil)
 
-       err = dlpc.syncLiteSubscription(context.TODO(), 
v2.LiteSubscriptionAction_PARTIAL_ADD, []string{"test"})
+       err = dlpc.syncLiteSubscription(context.TODO(), 
v2.LiteSubscriptionAction_PARTIAL_ADD, []string{"test"}, nil)
        if err == nil {
                t.Fatal("expected error for non-OK status code")
        }
diff --git a/golang/offset_option.go b/golang/offset_option.go
new file mode 100644
index 00000000..1a79d8c6
--- /dev/null
+++ b/golang/offset_option.go
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package golang
+
+import (
+       "fmt"
+
+       v2 "github.com/apache/rocketmq-clients/golang/v5/protocol/v2"
+)
+
+type offsetOptionType int
+
+const (
+       offsetOptionTypePolicy offsetOptionType = iota
+       offsetOptionTypeOffset
+       offsetOptionTypeTailN
+       offsetOptionTypeTimestamp
+)
+
+// OffsetOption specifies the starting offset for lite topic consumption.
+type OffsetOption struct {
+       optionType offsetOptionType
+       value      int64
+}
+
+var (
+       // LastOffset starts consuming from the last consumed offset of the 
consumer group.
+       LastOffset = OffsetOption{optionType: offsetOptionTypePolicy, value: 
int64(v2.OffsetOption_LAST)}
+       // MinOffset starts consuming from the minimum available offset.
+       MinOffset = OffsetOption{optionType: offsetOptionTypePolicy, value: 
int64(v2.OffsetOption_MIN)}
+       // MaxOffset starts consuming from the maximum available offset, 
skipping existing messages.
+       MaxOffset = OffsetOption{optionType: offsetOptionTypePolicy, value: 
int64(v2.OffsetOption_MAX)}
+)
+
+// NewOffsetOptionWithOffset creates an OffsetOption from a specific offset.
+func NewOffsetOptionWithOffset(offset int64) (OffsetOption, error) {
+       if offset < 0 {
+               return OffsetOption{}, fmt.Errorf("offset must be greater than 
or equal to 0")
+       }
+       return OffsetOption{optionType: offsetOptionTypeOffset, value: offset}, 
nil
+}
+
+// NewOffsetOptionWithTailN creates an OffsetOption from the last N messages.
+func NewOffsetOptionWithTailN(tailN int64) (OffsetOption, error) {
+       if tailN < 0 {
+               return OffsetOption{}, fmt.Errorf("tailN must be greater than 
or equal to 0")
+       }
+       return OffsetOption{optionType: offsetOptionTypeTailN, value: tailN}, 
nil
+}
+
+// NewOffsetOptionWithTimestamp creates an OffsetOption from a Unix 
millisecond timestamp.
+func NewOffsetOptionWithTimestamp(timestamp int64) (OffsetOption, error) {
+       if timestamp < 0 {
+               return OffsetOption{}, fmt.Errorf("timestamp must be greater 
than or equal to 0")
+       }
+       return OffsetOption{optionType: offsetOptionTypeTimestamp, value: 
timestamp}, nil
+}
+
+func (option OffsetOption) toProtobuf() *v2.OffsetOption {
+       switch option.optionType {
+       case offsetOptionTypePolicy:
+               return &v2.OffsetOption{
+                       OffsetType: &v2.OffsetOption_Policy_{
+                               Policy: v2.OffsetOption_Policy(option.value),
+                       },
+               }
+       case offsetOptionTypeOffset:
+               return &v2.OffsetOption{
+                       OffsetType: &v2.OffsetOption_Offset{
+                               Offset: option.value,
+                       },
+               }
+       case offsetOptionTypeTailN:
+               return &v2.OffsetOption{
+                       OffsetType: &v2.OffsetOption_TailN{
+                               TailN: option.value,
+                       },
+               }
+       case offsetOptionTypeTimestamp:
+               return &v2.OffsetOption{
+                       OffsetType: &v2.OffsetOption_Timestamp{
+                               Timestamp: option.value,
+                       },
+               }
+       default:
+               return nil
+       }
+}
diff --git a/golang/offset_option_test.go b/golang/offset_option_test.go
new file mode 100644
index 00000000..954b3f59
--- /dev/null
+++ b/golang/offset_option_test.go
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package golang
+
+import (
+       "testing"
+
+       v2 "github.com/apache/rocketmq-clients/golang/v5/protocol/v2"
+)
+
+func TestOffsetOptionToProtobuf(t *testing.T) {
+       tests := []struct {
+               name        string
+               option      OffsetOption
+               offsetType  interface{}
+               expectValue int64
+       }{
+               {
+                       name:        "last offset",
+                       option:      LastOffset,
+                       offsetType:  &v2.OffsetOption_Policy_{},
+                       expectValue: int64(v2.OffsetOption_LAST),
+               },
+               {
+                       name:        "offset",
+                       option:      
mustOffsetOption(NewOffsetOptionWithOffset(100)),
+                       offsetType:  &v2.OffsetOption_Offset{},
+                       expectValue: 100,
+               },
+               {
+                       name:        "tail n",
+                       option:      
mustOffsetOption(NewOffsetOptionWithTailN(10)),
+                       offsetType:  &v2.OffsetOption_TailN{},
+                       expectValue: 10,
+               },
+               {
+                       name:        "timestamp",
+                       option:      
mustOffsetOption(NewOffsetOptionWithTimestamp(1234567890)),
+                       offsetType:  &v2.OffsetOption_Timestamp{},
+                       expectValue: 1234567890,
+               },
+       }
+
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       protobufOption := tt.option.toProtobuf()
+                       switch tt.offsetType.(type) {
+                       case *v2.OffsetOption_Policy_:
+                               if _, ok := 
protobufOption.GetOffsetType().(*v2.OffsetOption_Policy_); !ok {
+                                       t.Fatalf("expected policy offset type, 
got %T", protobufOption.GetOffsetType())
+                               }
+                               if int64(protobufOption.GetPolicy()) != 
tt.expectValue {
+                                       t.Errorf("expected policy %d, got %d", 
tt.expectValue, protobufOption.GetPolicy())
+                               }
+                       case *v2.OffsetOption_Offset:
+                               if _, ok := 
protobufOption.GetOffsetType().(*v2.OffsetOption_Offset); !ok {
+                                       t.Fatalf("expected offset type, got 
%T", protobufOption.GetOffsetType())
+                               }
+                               if protobufOption.GetOffset() != tt.expectValue 
{
+                                       t.Errorf("expected offset %d, got %d", 
tt.expectValue, protobufOption.GetOffset())
+                               }
+                       case *v2.OffsetOption_TailN:
+                               if _, ok := 
protobufOption.GetOffsetType().(*v2.OffsetOption_TailN); !ok {
+                                       t.Fatalf("expected tail_n type, got 
%T", protobufOption.GetOffsetType())
+                               }
+                               if protobufOption.GetTailN() != tt.expectValue {
+                                       t.Errorf("expected tail_n %d, got %d", 
tt.expectValue, protobufOption.GetTailN())
+                               }
+                       case *v2.OffsetOption_Timestamp:
+                               if _, ok := 
protobufOption.GetOffsetType().(*v2.OffsetOption_Timestamp); !ok {
+                                       t.Fatalf("expected timestamp type, got 
%T", protobufOption.GetOffsetType())
+                               }
+                               if protobufOption.GetTimestamp() != 
tt.expectValue {
+                                       t.Errorf("expected timestamp %d, got 
%d", tt.expectValue, protobufOption.GetTimestamp())
+                               }
+                       }
+               })
+       }
+}
+
+func TestOffsetOptionRejectsNegativeValue(t *testing.T) {
+       if _, err := NewOffsetOptionWithOffset(-1); err == nil {
+               t.Fatal("expected error for negative offset")
+       }
+       if _, err := NewOffsetOptionWithTailN(-1); err == nil {
+               t.Fatal("expected error for negative tailN")
+       }
+       if _, err := NewOffsetOptionWithTimestamp(-1); err == nil {
+               t.Fatal("expected error for negative timestamp")
+       }
+}
+
+func mustOffsetOption(option OffsetOption, err error) OffsetOption {
+       if err != nil {
+               panic(err)
+       }
+       return option
+}

Reply via email to