This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new 601aee5b refactor(lite_push_consumer): use `sync.Map` to replace 
regular map and fix related method calls (#1171)
601aee5b is described below

commit 601aee5be5318dd903bf8c69ae6ababb88cad183
Author: Quan <[email protected]>
AuthorDate: Mon Jan 12 14:24:25 2026 +0800

    refactor(lite_push_consumer): use `sync.Map` to replace regular map and fix 
related method calls (#1171)
    
    test(lite_push_consumer_test): update test cases to match new `sync.Map` 
implementation
    
    docs(README.md): update development guide and build steps
    
    Change-Id: I08e1483141fd4f4e21629be86160f9e42de0ffcb
    
    Co-authored-by: 靖泉 <[email protected]>
---
 golang/README.md                     |  27 ++++++++
 golang/README_dev.md                 |   4 --
 golang/lite_push_consumer.go         |  25 ++++---
 golang/lite_push_consumer_options.go |  13 ++--
 golang/lite_push_consumer_test.go    | 128 +++++++++--------------------------
 golang/push_consumer_test.go         |  17 +++++
 6 files changed, 96 insertions(+), 118 deletions(-)

diff --git a/golang/README.md b/golang/README.md
index 347b8713..41df54a6 100644
--- a/golang/README.md
+++ b/golang/README.md
@@ -24,5 +24,32 @@ Otherwise, to install the `golang` package, run the 
following command:
 go get -u github.com/apache/rocketmq-clients/golang/v5
 ```
 
+## Development
+
+### Protocol Buffers Generation
+
+If you need to regenerate the Protocol Buffers code after modifying the 
`.proto` files, run the following commands:
+
+```bash
+protoc --go-grpc_out=. apache/rocketmq/v2/*.proto
+protoc --go_out=. apache/rocketmq/v2/*.proto
+```
+
+### Building
+
+To build the entire project:
+
+```bash
+go build ./...
+```
+
+### Running Tests
+
+To run all tests:
+
+```bash
+go test ./...
+```
+
 [codecov-golang-image]: 
https://img.shields.io/codecov/c/gh/apache/rocketmq-clients/master?flag=golang&label=Golang%20Coverage&logo=codecov
 [codecov-url]: https://app.codecov.io/gh/apache/rocketmq-clients
diff --git a/golang/README_dev.md b/golang/README_dev.md
deleted file mode 100644
index 1c21a789..00000000
--- a/golang/README_dev.md
+++ /dev/null
@@ -1,4 +0,0 @@
-
-# golang pb generate:
-protoc --go-grpc_out=. apache/rocketmq/v2/*.proto
-protoc --go_out=. apache/rocketmq/v2/*.proto
\ No newline at end of file
diff --git a/golang/lite_push_consumer.go b/golang/lite_push_consumer.go
index 2789ba93..3fa620fe 100644
--- a/golang/lite_push_consumer.go
+++ b/golang/lite_push_consumer.go
@@ -54,15 +54,15 @@ func NewLitePushConsumerConfig(bindTopic string, 
invisibleDuration time.Duration
 }
 
 var NewLitePushConsumer = func(config *Config, liteConfig 
*LitePushConsumerConfig, opts ...PushConsumerOption) (LitePushConsumer, error) {
+       if liteConfig == nil {
+               return nil, errors.New("LitePushConsumerConfig is required")
+       }
        if liteConfig.bindTopic == "" {
                return nil, errors.New("LitePushConsumerConfig.bindTopic is 
required")
        }
        filterExpressionMap := map[string]*FilterExpression{
                liteConfig.bindTopic: SUB_ALL,
        }
-       if liteConfig == nil {
-               return nil, errors.New("LitePushConsumerConfig is required")
-       }
        opts = append(opts, 
WithPushSubscriptionExpressions(filterExpressionMap))
        if pushConsumer, err := newPushConsumer(config, opts...); err != nil {
                return nil, err
@@ -104,7 +104,7 @@ func (lpc *defaultLitePushConsumer) 
notifyUnsubscribeLite(command *v2.NotifyUnsu
        if liteTopic == "" {
                return
        }
-       delete(lpc.litePushConsumerSettings.liteTopicSet, liteTopic)
+       lpc.litePushConsumerSettings.liteTopicSet.Delete(liteTopic)
 }
 
 func (lpc *defaultLitePushConsumer) SubscribeLite(topic string) error {
@@ -115,7 +115,7 @@ func (lpc *defaultLitePushConsumer) SubscribeLite(topic 
string) error {
                sugarBaseLogger.Errorf("LitePushConsumer SubscribeLite topic:%s 
err:%v", topic, err)
                return err
        }
-       lpc.litePushConsumerSettings.liteTopicSet[topic] = struct{}{}
+       lpc.litePushConsumerSettings.liteTopicSet.Store(topic, struct{}{})
        return nil
 }
 
@@ -127,7 +127,7 @@ func (lpc *defaultLitePushConsumer) UnSubscribeLite(topic 
string) error {
                sugarBaseLogger.Errorf("LitePushConsumer UnSubscribeLite 
topic:%s err:%v", topic, err)
                return err
        }
-       delete(lpc.litePushConsumerSettings.liteTopicSet, topic)
+       lpc.litePushConsumerSettings.liteTopicSet.Delete(topic)
        return nil
 }
 
@@ -140,10 +140,13 @@ func (lpc *defaultLitePushConsumer) checkRunning() error {
 }
 
 func (lpc *defaultLitePushConsumer) syncAllLiteSubscription() {
-       var liteTopicSet = make([]string, 0, 
len(lpc.litePushConsumerSettings.liteTopicSet))
-       for k := range lpc.litePushConsumerSettings.liteTopicSet {
-               liteTopicSet = append(liteTopicSet, k)
-       }
+       liteTopicSet := make([]string, 0, 
lpc.litePushConsumerSettings.maxLiteTopicSize)
+       lpc.litePushConsumerSettings.liteTopicSet.Range(func(key, value 
interface{}) bool {
+               if liteTopic, ok := key.(string); ok {
+                       liteTopicSet = append(liteTopicSet, liteTopic)
+               }
+               return true
+       })
        if len(liteTopicSet) == 0 {
                return
        }
@@ -157,7 +160,7 @@ func (lpc *defaultLitePushConsumer) 
syncLiteSubscription(context context.Context
        request := v2.SyncLiteSubscriptionRequest{
                Action: action,
                Topic: &v2.Resource{
-                       Name:              
lpc.litePushConsumerSettings.bingTopic,
+                       Name:              
lpc.litePushConsumerSettings.bindTopic,
                        ResourceNamespace: lpc.cli.config.NameSpace,
                },
                Group:        lpc.litePushConsumerSettings.groupName,
diff --git a/golang/lite_push_consumer_options.go 
b/golang/lite_push_consumer_options.go
index 0daeb589..e01ae90f 100644
--- a/golang/lite_push_consumer_options.go
+++ b/golang/lite_push_consumer_options.go
@@ -19,6 +19,7 @@ package golang
 
 import (
        "fmt"
+       "sync"
        "time"
 
        v2 "github.com/apache/rocketmq-clients/golang/v5/protocol/v2"
@@ -29,9 +30,9 @@ var _ = ClientSettings(&litePushConsumerSettings{})
 
 type litePushConsumerSettings struct {
        *pushConsumerSettings
-       bingTopic             string
-       liteTopicSet          map[string]struct{}
-       liteSubscriptionQuota int32 // default 1200
+       bindTopic             string
+       liteTopicSet          *sync.Map
+       liteSubscriptionQuota int32
        maxLiteTopicSize      int32
        invisibleDuration     time.Duration
 }
@@ -39,11 +40,11 @@ type litePushConsumerSettings struct {
 func newLitePushConsumerSettings(settings *pushConsumerSettings, bindTopic 
string, invisibleDuration time.Duration) *litePushConsumerSettings {
        return &litePushConsumerSettings{
                pushConsumerSettings: settings,
-               bingTopic:            bindTopic,
-               liteTopicSet:         map[string]struct{}{},
+               bindTopic:            bindTopic,
+               liteTopicSet:         &sync.Map{},
                invisibleDuration:    invisibleDuration,
                // default value
-               liteSubscriptionQuota: 1200,
+               liteSubscriptionQuota: 2000,
                maxLiteTopicSize:      64,
        }
 }
diff --git a/golang/lite_push_consumer_test.go 
b/golang/lite_push_consumer_test.go
index f952de09..953159f0 100644
--- a/golang/lite_push_consumer_test.go
+++ b/golang/lite_push_consumer_test.go
@@ -29,31 +29,26 @@ import (
        "google.golang.org/protobuf/types/known/durationpb"
 )
 
-// 全局变量,用于在测试过程中设置和初始化
 var (
        mockCtrl      *gomock.Controller
        mockRpcClient *MockRpcClient
 )
 
-// 设置测试环境,创建并初始化 mock 对象
 func setupTest(t *testing.T) {
        mockCtrl = gomock.NewController(t)
        mockRpcClient = NewMockRpcClient(mockCtrl)
 }
 
-// 清理测试环境
 func teardownTest() {
        mockCtrl.Finish()
 }
 
-// 用于测试的辅助函数,设置标准的成功响应
 func setupSuccessResponse() *v2.SyncLiteSubscriptionResponse {
        return &v2.SyncLiteSubscriptionResponse{
                Status: &v2.Status{Code: v2.Code_OK},
        }
 }
 
-// 用于测试的辅助函数,设置错误响应
 func setupErrorResponse(code v2.Code, message string) 
*v2.SyncLiteSubscriptionResponse {
        return &v2.SyncLiteSubscriptionResponse{
                Status: &v2.Status{
@@ -64,13 +59,11 @@ func setupErrorResponse(code v2.Code, message string) 
*v2.SyncLiteSubscriptionRe
 }
 
 func TestNewLitePushConsumer(t *testing.T) {
-       // 设置测试环境
        setupTest(t)
        defer teardownTest()
 
        config := &Config{Endpoint: fakeAddress, NameSpace: "test-namespace", 
ConsumerGroup: "test-group"}
 
-       // 测试成功创建 LitePushConsumer
        liteConfig := &LitePushConsumerConfig{bindTopic: "bind-topic"}
        lpc, err := NewLitePushConsumer(config, liteConfig, 
WithPushMessageListener(&FuncMessageListener{
                Consume: func(*MessageView) ConsumerResult { return SUCCESS },
@@ -80,29 +73,25 @@ func TestNewLitePushConsumer(t *testing.T) {
        }
 
        dlpc := lpc.(*defaultLitePushConsumer)
-       if dlpc.litePushConsumerSettings.bingTopic != "bind-topic" {
-               t.Errorf("expected bind topic 'bind-topic', got %s", 
dlpc.litePushConsumerSettings.bingTopic)
+       if dlpc.litePushConsumerSettings.bindTopic != "bind-topic" {
+               t.Errorf("expected bind topic 'bind-topic', got %s", 
dlpc.litePushConsumerSettings.bindTopic)
        }
 
-       // 验证 client type 是否正确设置为 LITE_PUSH_CONSUMER
        if int32(dlpc.pcSettings.clientType) != 
int32(v2.ClientType_LITE_PUSH_CONSUMER) {
                t.Errorf("expected client type LITE_PUSH_CONSUMER, got %v", 
dlpc.pcSettings.clientType)
        }
 
-       // 验证 isFifo 被强制设置为 true
        if !dlpc.pcSettings.isFifo {
                t.Error("expected isFifo to be true for lite push consumer")
        }
 }
 
 func TestNewLitePushConsumer_EmptyBindTopic(t *testing.T) {
-       // 设置测试环境
        setupTest(t)
        defer teardownTest()
 
        config := &Config{Endpoint: fakeAddress, NameSpace: "test-namespace", 
ConsumerGroup: "test-group"}
 
-       // 测试空 BindTopic 的错误情况
        liteConfig := &LitePushConsumerConfig{bindTopic: ""}
        _, err := NewLitePushConsumer(config, liteConfig)
        if err == nil {
@@ -115,7 +104,6 @@ func TestNewLitePushConsumer_EmptyBindTopic(t *testing.T) {
        }
 }
 
-// 辅助方法: 创建测试用的 LitePushConsumer 实例
 func createTestLitePushConsumer(t *testing.T) (*defaultLitePushConsumer, 
error) {
        config := &Config{Endpoint: fakeAddress, NameSpace: "test-namespace", 
ConsumerGroup: "test-group"}
        liteConfig := &LitePushConsumerConfig{bindTopic: "bind-topic"}
@@ -129,18 +117,14 @@ func createTestLitePushConsumer(t *testing.T) 
(*defaultLitePushConsumer, error)
 
        dlpc := lpc.(*defaultLitePushConsumer)
 
-       // 模拟客户端已经启动
        dlpc.cli.on.Store(true)
 
-       // 创建完全自定义的 mockedClientManager
        mockedClientManager := &mockedClientManager{
                mockRpcClient: mockRpcClient,
        }
 
-       // 只需要替换一个 clientManager,因为 dlpc.cli 和 dlpc.defaultPushConsumer.cli 
是同一个实例
        dlpc.cli.clientManager = mockedClientManager
 
-       // 验证两个 cli 是否是同一个实例
        if dlpc.cli != dlpc.defaultPushConsumer.cli {
                t.Errorf("Expected dlpc.cli and dlpc.defaultPushConsumer.cli to 
be the same instance")
        }
@@ -148,12 +132,10 @@ func createTestLitePushConsumer(t *testing.T) 
(*defaultLitePushConsumer, error)
        return dlpc, nil
 }
 
-// mockedClientManager 完全实现 ClientManager 接口
 type mockedClientManager struct {
        mockRpcClient *MockRpcClient
 }
 
-// 实现 ClientManager 接口的所有方法
 func (m *mockedClientManager) RegisterClient(client Client)   {}
 func (m *mockedClientManager) UnRegisterClient(client Client) {}
 func (m *mockedClientManager) QueryRoute(ctx context.Context, endpoints 
*v2.Endpoints, request *v2.QueryRouteRequest, duration time.Duration) 
(*v2.QueryRouteResponse, error) {
@@ -190,15 +172,12 @@ func (m *mockedClientManager) 
ForwardMessageToDeadLetterQueue(ctx context.Contex
        return nil, nil
 }
 
-// SyncLiteSubscription 是关键方法,直接使用 mockRpcClient
 func (m *mockedClientManager) SyncLiteSubscription(ctx context.Context, 
endpoints *v2.Endpoints, request *v2.SyncLiteSubscriptionRequest, duration 
time.Duration) (*v2.SyncLiteSubscriptionResponse, error) {
-       // 添加调试日志
        fmt.Printf("DEBUG: mockedClientManager.SyncLiteSubscription called with 
request: %+v\n", request)
        return m.mockRpcClient.SyncLiteSubscription(ctx, request)
 }
 
 func TestLitePushConsumer_SubscribeLite(t *testing.T) {
-       // 设置测试环境
        setupTest(t)
        defer teardownTest()
 
@@ -207,12 +186,10 @@ func TestLitePushConsumer_SubscribeLite(t *testing.T) {
                t.Fatalf("failed to create test lite push consumer: %v", err)
        }
 
-       // 验证 mock 对象是否正确注入
        if dlpc.defaultPushConsumer.cli.clientManager == nil {
                t.Fatal("clientManager should not be nil")
        }
 
-       // Mock SyncLiteSubscription 成功响应 - 简化版本
        mockRpcClient.EXPECT().SyncLiteSubscription(gomock.Any(), 
gomock.Any()).Return(setupSuccessResponse(), nil).Times(1)
 
        err = dlpc.SubscribeLite("lite-topic-1")
@@ -220,14 +197,12 @@ func TestLitePushConsumer_SubscribeLite(t *testing.T) {
                t.Fatalf("expected no error for SubscribeLite, got %v", err)
        }
 
-       // 验证 lite topic 被添加到 set 中
-       if _, exists := 
dlpc.litePushConsumerSettings.liteTopicSet["lite-topic-1"]; !exists {
+       if _, exists := 
dlpc.litePushConsumerSettings.liteTopicSet.Load("lite-topic-1"); !exists {
                t.Error("expected lite topic to be added to set")
        }
 }
 
 func TestLitePushConsumer_SubscribeLite_NotRunning(t *testing.T) {
-       // 设置测试环境
        setupTest(t)
        defer teardownTest()
 
@@ -236,7 +211,6 @@ func TestLitePushConsumer_SubscribeLite_NotRunning(t 
*testing.T) {
                t.Fatalf("failed to create test lite push consumer: %v", err)
        }
 
-       // 将客户端状态设置为未运行
        dlpc.cli.on.Store(false)
 
        err = dlpc.SubscribeLite("lite-topic-1")
@@ -251,7 +225,6 @@ func TestLitePushConsumer_SubscribeLite_NotRunning(t 
*testing.T) {
 }
 
 func TestLitePushConsumer_SubscribeLite_RpcError(t *testing.T) {
-       // 设置测试环境
        setupTest(t)
        defer teardownTest()
 
@@ -260,24 +233,19 @@ func TestLitePushConsumer_SubscribeLite_RpcError(t 
*testing.T) {
                t.Fatalf("failed to create test lite push consumer: %v", err)
        }
 
-       // Mock RPC 错误
-       mockRpcClient.EXPECT().SyncLiteSubscription(gomock.Any(), 
gomock.Any()).Return(
-               nil, errors.New("rpc error"),
-       )
+       mockRpcClient.EXPECT().SyncLiteSubscription(gomock.Any(), 
gomock.Any()).Return(nil, errors.New("rpc error"))
 
        err = dlpc.SubscribeLite("lite-topic-1")
        if err == nil {
                t.Fatal("expected rpc error")
        }
 
-       // 验证 lite topic 没有被添加到 set 中
-       if _, exists := 
dlpc.litePushConsumerSettings.liteTopicSet["lite-topic-1"]; exists {
+       if _, exists := 
dlpc.litePushConsumerSettings.liteTopicSet.Load("lite-topic-1"); exists {
                t.Error("lite topic should not be added when rpc fails")
        }
 }
 
 func TestLitePushConsumer_UnSubscribeLite(t *testing.T) {
-       // 设置测试环境
        setupTest(t)
        defer teardownTest()
 
@@ -286,35 +254,29 @@ func TestLitePushConsumer_UnSubscribeLite(t *testing.T) {
                t.Fatalf("failed to create test lite push consumer: %v", err)
        }
 
-       // 预先添加一个 lite topic
-       dlpc.litePushConsumerSettings.liteTopicSet["lite-topic-1"] = struct{}{}
+       dlpc.litePushConsumerSettings.liteTopicSet.Store("lite-topic-1", 
struct{}{})
 
-       // Mock SyncLiteSubscription 成功响应
-       mockRpcClient.EXPECT().SyncLiteSubscription(gomock.Any(), 
gomock.Any()).DoAndReturn(
-               func(ctx context.Context, req *v2.SyncLiteSubscriptionRequest) 
(*v2.SyncLiteSubscriptionResponse, error) {
-                       if req.Action != 
v2.LiteSubscriptionAction_PARTIAL_REMOVE {
-                               t.Errorf("expected action INCREMENTAL_REMOVE, 
got %v", req.Action)
-                       }
-                       if len(req.LiteTopicSet) != 1 || req.LiteTopicSet[0] != 
"lite-topic-1" {
-                               t.Errorf("expected lite topic set 
['lite-topic-1'], got %v", req.LiteTopicSet)
-                       }
-                       return setupSuccessResponse(), nil
-               },
-       )
+       mockRpcClient.EXPECT().SyncLiteSubscription(gomock.Any(), 
gomock.Any()).DoAndReturn(func(ctx context.Context, req 
*v2.SyncLiteSubscriptionRequest) (*v2.SyncLiteSubscriptionResponse, error) {
+               if req.Action != v2.LiteSubscriptionAction_PARTIAL_REMOVE {
+                       t.Errorf("expected action INCREMENTAL_REMOVE, got %v", 
req.Action)
+               }
+               if len(req.LiteTopicSet) != 1 || req.LiteTopicSet[0] != 
"lite-topic-1" {
+                       t.Errorf("expected lite topic set ['lite-topic-1'], got 
%v", req.LiteTopicSet)
+               }
+               return setupSuccessResponse(), nil
+       })
 
        err = dlpc.UnSubscribeLite("lite-topic-1")
        if err != nil {
                t.Fatalf("expected no error for UnSubscribeLite, got %v", err)
        }
 
-       // 验证 lite topic 被从 set 中删除
-       if _, exists := 
dlpc.litePushConsumerSettings.liteTopicSet["lite-topic-1"]; exists {
+       if _, exists := 
dlpc.litePushConsumerSettings.liteTopicSet.Load("lite-topic-1"); exists {
                t.Error("expected lite topic to be removed from set")
        }
 }
 
 func TestLitePushConsumer_notifyUnsubscribeLite(t *testing.T) {
-       // 设置测试环境
        setupTest(t)
        defer teardownTest()
 
@@ -323,8 +285,7 @@ func TestLitePushConsumer_notifyUnsubscribeLite(t 
*testing.T) {
                t.Fatalf("failed to create test lite push consumer: %v", err)
        }
 
-       // 预先添加一个 lite topic
-       dlpc.litePushConsumerSettings.liteTopicSet["lite-topic-notify"] = 
struct{}{}
+       dlpc.litePushConsumerSettings.liteTopicSet.Store("lite-topic-notify", 
struct{}{})
 
        cmd := &v2.NotifyUnsubscribeLiteCommand{
                LiteTopic: "lite-topic-notify",
@@ -332,14 +293,12 @@ func TestLitePushConsumer_notifyUnsubscribeLite(t 
*testing.T) {
 
        dlpc.notifyUnsubscribeLite(cmd)
 
-       // 验证 lite topic 被从 set 中删除
-       if _, exists := 
dlpc.litePushConsumerSettings.liteTopicSet["lite-topic-notify"]; exists {
+       if _, exists := 
dlpc.litePushConsumerSettings.liteTopicSet.Load("lite-topic-notify"); exists {
                t.Error("expected lite topic to be removed from set after 
notify")
        }
 }
 
 func TestLitePushConsumer_notifyUnsubscribeLite_EmptyLiteTopic(t *testing.T) {
-       // 设置测试环境
        setupTest(t)
        defer teardownTest()
 
@@ -348,23 +307,20 @@ func 
TestLitePushConsumer_notifyUnsubscribeLite_EmptyLiteTopic(t *testing.T) {
                t.Fatalf("failed to create test lite push consumer: %v", err)
        }
 
-       // 预先添加一个 lite topic
-       dlpc.litePushConsumerSettings.liteTopicSet["lite-topic-keep"] = 
struct{}{}
+       dlpc.litePushConsumerSettings.liteTopicSet.Store("lite-topic-keep", 
struct{}{})
 
        cmd := &v2.NotifyUnsubscribeLiteCommand{
-               LiteTopic: "", // 空的 lite topic
+               LiteTopic: "",
        }
 
        dlpc.notifyUnsubscribeLite(cmd)
 
-       // 验证 lite topic 仍然存在(因为 LiteTopic 为空,函数会提前返回)
-       if _, exists := 
dlpc.litePushConsumerSettings.liteTopicSet["lite-topic-keep"]; !exists {
+       if _, exists := 
dlpc.litePushConsumerSettings.liteTopicSet.Load("lite-topic-keep"); !exists {
                t.Error("lite topic should not be removed when command has 
empty lite topic")
        }
 }
 
 func TestLitePushConsumer_syncLiteSubscription_StatusError(t *testing.T) {
-       // 设置测试环境
        setupTest(t)
        defer teardownTest()
 
@@ -373,10 +329,7 @@ func 
TestLitePushConsumer_syncLiteSubscription_StatusError(t *testing.T) {
                t.Fatalf("failed to create test lite push consumer: %v", err)
        }
 
-       // Mock SyncLiteSubscription 返回错误状态码
-       mockRpcClient.EXPECT().SyncLiteSubscription(gomock.Any(), 
gomock.Any()).Return(
-               setupErrorResponse(v2.Code_INTERNAL_SERVER_ERROR, "internal 
error"), nil,
-       )
+       mockRpcClient.EXPECT().SyncLiteSubscription(gomock.Any(), 
gomock.Any()).Return(setupErrorResponse(v2.Code_INTERNAL_SERVER_ERROR, 
"internal error"), nil)
 
        err = dlpc.syncLiteSubscription(context.TODO(), 
v2.LiteSubscriptionAction_PARTIAL_ADD, []string{"test"})
        if err == nil {
@@ -398,7 +351,6 @@ func 
TestLitePushConsumer_syncLiteSubscription_StatusError(t *testing.T) {
 }
 
 func TestLitePushConsumer_WrapReceiveMessageRequest(t *testing.T) {
-       // 设置测试环境
        setupTest(t)
        defer teardownTest()
 
@@ -441,14 +393,12 @@ func TestLitePushConsumer_WrapReceiveMessageRequest(t 
*testing.T) {
                t.Error("expected attempt id to be set")
        }
 
-       // 修正 WrapReceiveMessageRequest 的 duration 断言
        if req.GetLongPollingTimeout().GetSeconds() != int64(10) {
                t.Errorf("expected polling timeout 10s, got %v", 
req.GetLongPollingTimeout().GetSeconds())
        }
 }
 
 func TestLitePushConsumer_WrapHeartbeatRequest(t *testing.T) {
-       // 设置测试环境
        setupTest(t)
        defer teardownTest()
 
@@ -467,14 +417,12 @@ func TestLitePushConsumer_WrapHeartbeatRequest(t 
*testing.T) {
                t.Errorf("expected namespace 'test-namespace', got %s", 
req.GetGroup().GetResourceNamespace())
        }
 
-       // 修正 WrapHeartbeatRequest 的 clientType 断言
        if int32(req.GetClientType()) != 
int32(v2.ClientType_LITE_PUSH_CONSUMER) {
                t.Errorf("expected client type LITE_PUSH_CONSUMER, got %v", 
req.GetClientType())
        }
 }
 
 func TestLitePushConsumerSettings_applySettingsCommand(t *testing.T) {
-       // 设置测试环境
        setupTest(t)
        defer teardownTest()
 
@@ -485,7 +433,6 @@ func TestLitePushConsumerSettings_applySettingsCommand(t 
*testing.T) {
 
        settings := dlpc.litePushConsumerSettings
 
-       // 创建测试 Settings
        liteQuota := int32(100)
        maxSize := int32(1024)
        fifoVal := false
@@ -495,7 +442,7 @@ func TestLitePushConsumerSettings_applySettingsCommand(t 
*testing.T) {
                        Subscription: &v2.Subscription{
                                LiteSubscriptionQuota: &liteQuota,
                                MaxLiteTopicSize:      &maxSize,
-                               Fifo:                  &fifoVal, // 这会被强制设置为 
true
+                               Fifo:                  &fifoVal,
                        },
                },
                BackoffPolicy: &v2.RetryPolicy{
@@ -515,24 +462,20 @@ func TestLitePushConsumerSettings_applySettingsCommand(t 
*testing.T) {
                t.Fatalf("applySettingsCommand failed: %v", err)
        }
 
-       // 验证 lite subscription quota 被设置
        if settings.liteSubscriptionQuota != liteQuota {
                t.Errorf("expected lite subscription quota %d, got %d", 
liteQuota, settings.liteSubscriptionQuota)
        }
 
-       // 验证 max lite topic size 被设置
        if settings.maxLiteTopicSize != maxSize {
                t.Errorf("expected max lite topic size %d, got %d", maxSize, 
settings.maxLiteTopicSize)
        }
 
-       // 验证 isFifo 被强制设置为 true
        if !settings.isFifo {
                t.Error("expected isFifo to be forced to true")
        }
 }
 
 func TestLitePushConsumerSettings_toProtobuf(t *testing.T) {
-       // 设置测试环境
        setupTest(t)
        defer teardownTest()
 
@@ -543,18 +486,15 @@ func TestLitePushConsumerSettings_toProtobuf(t 
*testing.T) {
 
        settings := dlpc.litePushConsumerSettings
 
-       // 设置一些测试值
        settings.liteSubscriptionQuota = 50
        settings.maxLiteTopicSize = 512
 
        protobuf := settings.toProtobuf()
 
-       // 验证 ClientType
        if int32(protobuf.GetClientType()) != 
int32(v2.ClientType_LITE_PUSH_CONSUMER) {
                t.Errorf("expected client type LITE_PUSH_CONSUMER, got %v", 
protobuf.GetClientType())
        }
 
-       // 验证 PubSub 设置
        pubsub := protobuf.GetPubSub()
        if pubsub == nil {
                t.Fatal("expected PubSub to be set")
@@ -565,27 +505,21 @@ func TestLitePushConsumerSettings_toProtobuf(t 
*testing.T) {
                t.Fatal("expected subscription to be set")
        }
 
-       // 验证 LiteSubscriptionQuota
        if subscription.Subscription.GetLiteSubscriptionQuota() != 50 {
                t.Errorf("expected lite subscription quota 50, got %d", 
subscription.Subscription.GetLiteSubscriptionQuota())
        }
 
-       // 验证 MaxLiteTopicSize
        if subscription.Subscription.GetMaxLiteTopicSize() != 512 {
                t.Errorf("expected max lite topic size 512, got %d", 
subscription.Subscription.GetMaxLiteTopicSize())
        }
 
-       // 验证订阅信息
-       if len(subscription.Subscription.GetSubscriptions()) == 0 {
-               t.Error("expected at least one subscription entry")
-       } else {
-               entry := subscription.Subscription.GetSubscriptions()[0]
-               // 修正 subscription entry 断言
-               if entry.GetTopic().GetName() != "bind-topic" {
-                       t.Errorf("expected topic name 'bind-topic', got %s", 
entry.GetTopic().GetName())
-               }
-               if entry.GetTopic().GetResourceNamespace() != "test-namespace" {
-                       t.Errorf("expected namespace 'test-namespace', got %s", 
entry.GetTopic().GetResourceNamespace())
-               }
+       entry := subscription.Subscription.GetSubscriptions()[0]
+
+       if entry.GetTopic().GetName() != "bind-topic" {
+               t.Errorf("expected topic name 'bind-topic', got %s", 
entry.GetTopic().GetName())
+       }
+
+       if entry.GetTopic().GetResourceNamespace() != "test-namespace" {
+               t.Errorf("expected namespace 'test-namespace', got %s", 
entry.GetTopic().GetResourceNamespace())
        }
 }
diff --git a/golang/push_consumer_test.go b/golang/push_consumer_test.go
index ae40917e..45b0ddbd 100644
--- a/golang/push_consumer_test.go
+++ b/golang/push_consumer_test.go
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package golang
 
 import (

Reply via email to