This is an automated email from the ASF dual-hosted git repository.

huzongtang pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/native by this push:
     new 95f5b0d  [ISSUE #108] Add producer unit test (#109)
95f5b0d is described below

commit 95f5b0dd1ce2eaba64e1e1185c5407ef49bc5264
Author: xujianhai666 <[email protected]>
AuthorDate: Wed Jul 10 11:17:09 2019 +0800

    [ISSUE #108] Add producer unit test (#109)
    
    * add producer unit test. resolve #108
    
    * clean line
---
 consumer/consumer.go                  |   2 +-
 consumer/offset_store.go              |   4 +-
 consumer/pull_consumer.go             |   2 +-
 examples/producer/interceptor/main.go |   1 -
 go.mod                                |   2 +-
 go.sum                                |   5 +-
 internal/client.go                    | 106 ++++++---
 internal/client_test.go               |   2 +-
 internal/mock_client.go               | 411 ++++++++++++++++++++++++++++++++++
 internal/route.go                     |   7 +
 producer/producer.go                  |  10 +-
 producer/producer_test.go             | 234 +++++++++++++++++++
 producer/selector_test.go             |   5 +-
 13 files changed, 743 insertions(+), 48 deletions(-)

diff --git a/consumer/consumer.go b/consumer/consumer.go
index a02602a..5bd9674 100644
--- a/consumer/consumer.go
+++ b/consumer/consumer.go
@@ -225,7 +225,7 @@ type defaultConsumer struct {
        fromWhere      ConsumeFromWhere
 
        cType     ConsumeType
-       client    *internal.RMQClient
+       client    internal.RMQClient
        mqChanged func(topic string, mqAll, mqDivided []*primitive.MessageQueue)
        state     internal.ServiceState
        pause     bool
diff --git a/consumer/offset_store.go b/consumer/offset_store.go
index 5c55e27..45d80f2 100644
--- a/consumer/offset_store.go
+++ b/consumer/offset_store.go
@@ -180,11 +180,11 @@ func (local *localFileOffsetStore) remove(mq 
*primitive.MessageQueue) {
 type remoteBrokerOffsetStore struct {
        group       string
        OffsetTable map[string]map[int]*queueOffset `json:"OffsetTable"`
-       client      *internal.RMQClient
+       client      internal.RMQClient
        mutex       sync.RWMutex
 }
 
-func NewRemoteOffsetStore(group string, client *internal.RMQClient) 
OffsetStore {
+func NewRemoteOffsetStore(group string, client internal.RMQClient) OffsetStore 
{
        return &remoteBrokerOffsetStore{
                group:       group,
                client:      client,
diff --git a/consumer/pull_consumer.go b/consumer/pull_consumer.go
index 32cbaed..c8bcdc5 100644
--- a/consumer/pull_consumer.go
+++ b/consumer/pull_consumer.go
@@ -41,7 +41,7 @@ var (
 type defaultPullConsumer struct {
        state     internal.ServiceState
        option    consumerOptions
-       client    *internal.RMQClient
+       client    internal.RMQClient
        GroupName string
        Model     MessageModel
        UnitMode  bool
diff --git a/examples/producer/interceptor/main.go 
b/examples/producer/interceptor/main.go
index 4822de9..643d2a8 100644
--- a/examples/producer/interceptor/main.go
+++ b/examples/producer/interceptor/main.go
@@ -42,7 +42,6 @@ func main() {
        }
        for i := 0; i < 10; i++ {
                res, err := p.SendSync(context.Background(), &primitive.Message{
-                       //Topic: "test",
                        Topic:      "TopicTest",
                        Body:       []byte("Hello RocketMQ Go Client!"),
                        Properties: map[string]string{"order": strconv.Itoa(i)},
diff --git a/go.mod b/go.mod
index 684a7d2..322c847 100644
--- a/go.mod
+++ b/go.mod
@@ -4,11 +4,11 @@ go 1.12
 
 require (
        github.com/emirpasic/gods v1.12.0
+       github.com/golang/mock v1.3.1
        github.com/pkg/errors v0.8.1
        github.com/sirupsen/logrus v1.4.1
        github.com/stretchr/testify v1.3.0
        github.com/tidwall/gjson v1.2.1
        github.com/tidwall/match v1.0.1 // indirect
        github.com/tidwall/pretty v0.0.0-20190325153808-1166b9ac2b65 // indirect
-       golang.org/x/tools v0.0.0-20190606050223-4d9ae51c2468 // indirect
 )
diff --git a/go.sum b/go.sum
index 3b0271b..06d0d34 100644
--- a/go.sum
+++ b/go.sum
@@ -3,6 +3,8 @@ github.com/davecgh/go-spew v1.1.1 
h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
 github.com/davecgh/go-spew v1.1.1/go.mod 
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
 github.com/emirpasic/gods v1.12.0 
h1:QAUIPSaCu4G+POclxeqb3F+WPpdKqFGlw36+yOzGlrg=
 github.com/emirpasic/gods v1.12.0/go.mod 
h1:YfzfFFoVP/catgzJb4IKIqXjX78Ha8FMSDh3ymbK86o=
+github.com/golang/mock v1.3.1 h1:qGJ6qTW+x6xX/my+8YUVl4WNpX9B7+/l2tRsHGZ7f2s=
+github.com/golang/mock v1.3.1/go.mod 
h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFUx0Y=
 github.com/konsorten/go-windows-terminal-sequences v1.0.1 
h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk=
 github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod 
h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
 github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
@@ -30,5 +32,4 @@ golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod 
h1:STP8DvDyc/dI5b8T5h
 golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a 
h1:1BGLXjeY4akVXGgbC9HugT3Jv3hCI0z56oJR5vAMgBU=
 golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod 
h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
 golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
-golang.org/x/tools v0.0.0-20190606050223-4d9ae51c2468 
h1:fTfk6GjmihJbK0mSUFgPPgYpsdmApQ86Mcd4GuKax9U=
-golang.org/x/tools v0.0.0-20190606050223-4d9ae51c2468/go.mod 
h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc=
+golang.org/x/tools v0.0.0-20190425150028-36563e24a262/go.mod 
h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
diff --git a/internal/client.go b/internal/client.go
index a6e1bc5..1db43c7 100644
--- a/internal/client.go
+++ b/internal/client.go
@@ -118,7 +118,41 @@ func (opt *ClientOptions) String() string {
                opt.InstanceName, opt.UnitMode, opt.UnitName, 
opt.VIPChannelEnabled, opt.ACLEnabled)
 }
 
-type RMQClient struct {
+//go:generate mockgen -source client.go -destination mock_client.go --package 
internal RMQClient
+type RMQClient interface {
+       Start()
+       Shutdown()
+
+       ClientID() string
+
+       RegisterProducer(group string, producer InnerProducer)
+       InvokeSync(addr string, request *remote.RemotingCommand,
+               timeoutMillis time.Duration) (*remote.RemotingCommand, error)
+       InvokeAsync(addr string, request *remote.RemotingCommand,
+               timeoutMillis time.Duration, f func(*remote.RemotingCommand, 
error)) error
+       InvokeOneWay(addr string, request *remote.RemotingCommand,
+               timeoutMillis time.Duration) error
+       CheckClientInBroker()
+       SendHeartbeatToAllBrokerWithLock()
+       UpdateTopicRouteInfo()
+       SendMessageAsync(ctx context.Context, brokerAddrs, brokerName string, 
request *SendMessageRequest,
+               msgs []*primitive.Message, f func(result 
*primitive.SendResult)) error
+       SendMessageOneWay(ctx context.Context, brokerAddrs string, request 
*SendMessageRequest,
+               msgs []*primitive.Message) (*primitive.SendResult, error)
+
+       ProcessSendResponse(brokerName string, cmd *remote.RemotingCommand, 
resp *primitive.SendResult, msgs ...*primitive.Message)
+
+       RegisterConsumer(group string, consumer InnerConsumer) error
+       UnregisterConsumer(group string)
+       PullMessage(ctx context.Context, brokerAddrs string, request 
*PullMessageRequest) (*primitive.PullResult, error)
+       PullMessageAsync(ctx context.Context, brokerAddrs string, request 
*PullMessageRequest, f func(result *primitive.PullResult)) error
+       RebalanceImmediately()
+       UpdatePublishInfo(topic string, data *TopicRouteData)
+}
+
+var _ RMQClient = new(rmqClient)
+
+type rmqClient struct {
        option ClientOptions
        // group -> InnerProducer
        producerMap sync.Map
@@ -134,8 +168,8 @@ type RMQClient struct {
 
 var clientMap sync.Map
 
-func GetOrNewRocketMQClient(option ClientOptions) *RMQClient {
-       client := &RMQClient{
+func GetOrNewRocketMQClient(option ClientOptions) *rmqClient {
+       client := &rmqClient{
                option:       option,
                remoteClient: remote.NewRemotingClient(),
        }
@@ -147,10 +181,10 @@ func GetOrNewRocketMQClient(option ClientOptions) 
*RMQClient {
                        return nil
                })
        }
-       return actual.(*RMQClient)
+       return actual.(*rmqClient)
 }
 
-func (c *RMQClient) Start() {
+func (c *rmqClient) Start() {
        //ctx, cancel := context.WithCancel(context.Background())
        //c.cancel = cancel
        c.close = false
@@ -162,7 +196,7 @@ func (c *RMQClient) Start() {
                go func() {
                        // delay
                        time.Sleep(50 * time.Millisecond)
-                       for !c.close{
+                       for !c.close {
                                c.UpdateTopicRouteInfo()
                                time.Sleep(_PullNameServerInterval)
                        }
@@ -170,7 +204,7 @@ func (c *RMQClient) Start() {
 
                // TODO cleanOfflineBroker & sendHeartbeatToAllBrokerWithLock
                go func() {
-                       for !c.close{
+                       for !c.close {
                                cleanOfflineBroker()
                                c.SendHeartbeatToAllBrokerWithLock()
                                time.Sleep(_HeartbeatBrokerInterval)
@@ -180,7 +214,7 @@ func (c *RMQClient) Start() {
                // schedule persist offset
                go func() {
                        //time.Sleep(10 * time.Second)
-                       for !c.close{
+                       for !c.close {
                                c.consumerMap.Range(func(key, value 
interface{}) bool {
                                        consumer := value.(InnerConsumer)
                                        consumer.PersistConsumerOffset()
@@ -191,7 +225,7 @@ func (c *RMQClient) Start() {
                }()
 
                go func() {
-                       for !c.close{
+                       for !c.close {
                                c.RebalanceImmediately()
                                time.Sleep(_RebalanceInterval)
                        }
@@ -199,12 +233,12 @@ func (c *RMQClient) Start() {
        })
 }
 
-func (c *RMQClient) Shutdown() {
+func (c *rmqClient) Shutdown() {
        c.remoteClient.ShutDown()
        c.close = true
 }
 
-func (c *RMQClient) ClientID() string {
+func (c *rmqClient) ClientID() string {
        id := c.option.ClientIP + "@" + c.option.InstanceName
        if c.option.UnitName != "" {
                id += "@" + c.option.UnitName
@@ -212,7 +246,7 @@ func (c *RMQClient) ClientID() string {
        return id
 }
 
-func (c *RMQClient) InvokeSync(addr string, request *remote.RemotingCommand,
+func (c *rmqClient) InvokeSync(addr string, request *remote.RemotingCommand,
        timeoutMillis time.Duration) (*remote.RemotingCommand, error) {
        if c.close {
                return nil, ErrServiceState
@@ -220,7 +254,7 @@ func (c *RMQClient) InvokeSync(addr string, request 
*remote.RemotingCommand,
        return c.remoteClient.InvokeSync(addr, request, timeoutMillis)
 }
 
-func (c *RMQClient) InvokeAsync(addr string, request *remote.RemotingCommand,
+func (c *rmqClient) InvokeAsync(addr string, request *remote.RemotingCommand,
        timeoutMillis time.Duration, f func(*remote.RemotingCommand, error)) 
error {
        if c.close {
                return ErrServiceState
@@ -231,7 +265,7 @@ func (c *RMQClient) InvokeAsync(addr string, request 
*remote.RemotingCommand,
 
 }
 
-func (c *RMQClient) InvokeOneWay(addr string, request *remote.RemotingCommand,
+func (c *rmqClient) InvokeOneWay(addr string, request *remote.RemotingCommand,
        timeoutMillis time.Duration) error {
        if c.close {
                return ErrServiceState
@@ -239,11 +273,11 @@ func (c *RMQClient) InvokeOneWay(addr string, request 
*remote.RemotingCommand,
        return c.remoteClient.InvokeOneWay(addr, request, timeoutMillis)
 }
 
-func (c *RMQClient) CheckClientInBroker() {
+func (c *rmqClient) CheckClientInBroker() {
 }
 
 // TODO
-func (c *RMQClient) SendHeartbeatToAllBrokerWithLock() {
+func (c *rmqClient) SendHeartbeatToAllBrokerWithLock() {
        c.hbMutex.Lock()
        defer c.hbMutex.Unlock()
        hbData := &heartbeatData{
@@ -301,7 +335,7 @@ func (c *RMQClient) SendHeartbeatToAllBrokerWithLock() {
        })
 }
 
-func (c *RMQClient) UpdateTopicRouteInfo() {
+func (c *rmqClient) UpdateTopicRouteInfo() {
        publishTopicSet := make(map[string]bool, 0)
        c.producerMap.Range(func(key, value interface{}) bool {
                producer := value.(InnerProducer)
@@ -328,17 +362,17 @@ func (c *RMQClient) UpdateTopicRouteInfo() {
        })
 
        for topic := range subscribedTopicSet {
-               c.UpdateSubscribeInfo(topic, UpdateTopicRouteInfo(topic))
+               c.updateSubscribeInfo(topic, UpdateTopicRouteInfo(topic))
        }
 }
 
 // SendMessageAsync send message with batch by async
-func (c *RMQClient) SendMessageAsync(ctx context.Context, brokerAddrs, 
brokerName string, request *SendMessageRequest,
+func (c *rmqClient) SendMessageAsync(ctx context.Context, brokerAddrs, 
brokerName string, request *SendMessageRequest,
        msgs []*primitive.Message, f func(result *primitive.SendResult)) error {
        return nil
 }
 
-func (c *RMQClient) SendMessageOneWay(ctx context.Context, brokerAddrs string, 
request *SendMessageRequest,
+func (c *rmqClient) SendMessageOneWay(ctx context.Context, brokerAddrs string, 
request *SendMessageRequest,
        msgs []*primitive.Message) (*primitive.SendResult, error) {
        cmd := remote.NewRemotingCommand(ReqSendBatchMessage, request, 
encodeMessages(msgs))
        err := c.remoteClient.InvokeOneWay(brokerAddrs, cmd, 3*time.Second)
@@ -348,7 +382,7 @@ func (c *RMQClient) SendMessageOneWay(ctx context.Context, 
brokerAddrs string, r
        return nil, err
 }
 
-func (c *RMQClient) ProcessSendResponse(brokerName string, cmd 
*remote.RemotingCommand, resp *primitive.SendResult, msgs 
...*primitive.Message) {
+func (c *rmqClient) ProcessSendResponse(brokerName string, cmd 
*remote.RemotingCommand, resp *primitive.SendResult, msgs 
...*primitive.Message) {
        var status primitive.SendStatus
        switch cmd.Code {
        case ResFlushDiskTimeout:
@@ -394,7 +428,7 @@ func (c *RMQClient) ProcessSendResponse(brokerName string, 
cmd *remote.RemotingC
 }
 
 // PullMessage with sync
-func (c *RMQClient) PullMessage(ctx context.Context, brokerAddrs string, 
request *PullMessageRequest) (*primitive.PullResult, error) {
+func (c *rmqClient) PullMessage(ctx context.Context, brokerAddrs string, 
request *PullMessageRequest) (*primitive.PullResult, error) {
        cmd := remote.NewRemotingCommand(ReqPullMessage, request, nil)
        res, err := c.remoteClient.InvokeSync(brokerAddrs, cmd, 3*time.Second)
        if err != nil {
@@ -404,7 +438,7 @@ func (c *RMQClient) PullMessage(ctx context.Context, 
brokerAddrs string, request
        return c.processPullResponse(res)
 }
 
-func (c *RMQClient) processPullResponse(response *remote.RemotingCommand) 
(*primitive.PullResult, error) {
+func (c *rmqClient) processPullResponse(response *remote.RemotingCommand) 
(*primitive.PullResult, error) {
 
        pullResult := &primitive.PullResult{}
        switch response.Code {
@@ -426,7 +460,7 @@ func (c *RMQClient) processPullResponse(response 
*remote.RemotingCommand) (*prim
        return pullResult, nil
 }
 
-func (c *RMQClient) decodeCommandCustomHeader(pr *primitive.PullResult, cmd 
*remote.RemotingCommand) {
+func (c *rmqClient) decodeCommandCustomHeader(pr *primitive.PullResult, cmd 
*remote.RemotingCommand) {
        v, exist := cmd.ExtFields["maxOffset"]
        if exist {
                pr.MaxOffset, _ = strconv.ParseInt(v, 10, 64)
@@ -449,34 +483,34 @@ func (c *RMQClient) decodeCommandCustomHeader(pr 
*primitive.PullResult, cmd *rem
 }
 
 // PullMessageAsync pull message async
-func (c *RMQClient) PullMessageAsync(ctx context.Context, brokerAddrs string, 
request *PullMessageRequest, f func(result *primitive.PullResult)) error {
+func (c *rmqClient) PullMessageAsync(ctx context.Context, brokerAddrs string, 
request *PullMessageRequest, f func(result *primitive.PullResult)) error {
        return nil
 }
 
-func (c *RMQClient) RegisterConsumer(group string, consumer InnerConsumer) 
error {
+func (c *rmqClient) RegisterConsumer(group string, consumer InnerConsumer) 
error {
        c.consumerMap.Store(group, consumer)
        return nil
 }
 
-func (c *RMQClient) UnregisterConsumer(group string) {
+func (c *rmqClient) UnregisterConsumer(group string) {
 }
 
-func (c *RMQClient) RegisterProducer(group string, producer InnerProducer) {
+func (c *rmqClient) RegisterProducer(group string, producer InnerProducer) {
        c.producerMap.Store(group, producer)
 }
 
-func (c *RMQClient) UnregisterProducer(group string) {
+func (c *rmqClient) UnregisterProducer(group string) {
 }
 
-func (c *RMQClient) SelectProducer(group string) InnerProducer {
+func (c *rmqClient) SelectProducer(group string) InnerProducer {
        return nil
 }
 
-func (c *RMQClient) SelectConsumer(group string) InnerConsumer {
+func (c *rmqClient) SelectConsumer(group string) InnerConsumer {
        return nil
 }
 
-func (c *RMQClient) RebalanceImmediately() {
+func (c *rmqClient) RebalanceImmediately() {
        c.consumerMap.Range(func(key, value interface{}) bool {
                consumer := value.(InnerConsumer)
                consumer.Rebalance()
@@ -484,7 +518,7 @@ func (c *RMQClient) RebalanceImmediately() {
        })
 }
 
-func (c *RMQClient) UpdatePublishInfo(topic string, data *TopicRouteData) {
+func (c *rmqClient) UpdatePublishInfo(topic string, data *TopicRouteData) {
        if data == nil {
                return
        }
@@ -500,7 +534,7 @@ func (c *RMQClient) UpdatePublishInfo(topic string, data 
*TopicRouteData) {
        })
 }
 
-func (c *RMQClient) isNeedUpdatePublishInfo(topic string) bool {
+func (c *rmqClient) isNeedUpdatePublishInfo(topic string) bool {
        var result bool
        c.producerMap.Range(func(key, value interface{}) bool {
                p := value.(InnerProducer)
@@ -513,7 +547,7 @@ func (c *RMQClient) isNeedUpdatePublishInfo(topic string) 
bool {
        return result
 }
 
-func (c *RMQClient) UpdateSubscribeInfo(topic string, data *TopicRouteData) {
+func (c *rmqClient) updateSubscribeInfo(topic string, data *TopicRouteData) {
        if data == nil {
                return
        }
@@ -528,7 +562,7 @@ func (c *RMQClient) UpdateSubscribeInfo(topic string, data 
*TopicRouteData) {
        })
 }
 
-func (c *RMQClient) isNeedUpdateSubscribeInfo(topic string) bool {
+func (c *rmqClient) isNeedUpdateSubscribeInfo(topic string) bool {
        var result bool
        c.consumerMap.Range(func(key, value interface{}) bool {
                consumer := value.(InnerConsumer)
diff --git a/internal/client_test.go b/internal/client_test.go
index aafbca7..7814ddd 100644
--- a/internal/client_test.go
+++ b/internal/client_test.go
@@ -23,7 +23,7 @@ import (
 )
 
 func TestRMQClient_PullMessage(t *testing.T) {
-       client := GetOrNewRocketMQClient(ClientOption{})
+       client := GetOrNewRocketMQClient(ClientOptions{})
        req := &PullMessageRequest{
                ConsumerGroup:  "testGroup",
                Topic:          "wenfeng",
diff --git a/internal/mock_client.go b/internal/mock_client.go
new file mode 100644
index 0000000..3ee3f50
--- /dev/null
+++ b/internal/mock_client.go
@@ -0,0 +1,411 @@
+// Code generated by MockGen. DO NOT EDIT.
+// Source: client.go
+
+// Package internal is a generated GoMock package.
+package internal
+
+import (
+       context "context"
+       remote "github.com/apache/rocketmq-client-go/internal/remote"
+       primitive "github.com/apache/rocketmq-client-go/primitive"
+       gomock "github.com/golang/mock/gomock"
+       reflect "reflect"
+       time "time"
+)
+
+// MockInnerProducer is a mock of InnerProducer interface
+type MockInnerProducer struct {
+       ctrl     *gomock.Controller
+       recorder *MockInnerProducerMockRecorder
+}
+
+// MockInnerProducerMockRecorder is the mock recorder for MockInnerProducer
+type MockInnerProducerMockRecorder struct {
+       mock *MockInnerProducer
+}
+
+// NewMockInnerProducer creates a new mock instance
+func NewMockInnerProducer(ctrl *gomock.Controller) *MockInnerProducer {
+       mock := &MockInnerProducer{ctrl: ctrl}
+       mock.recorder = &MockInnerProducerMockRecorder{mock}
+       return mock
+}
+
+// EXPECT returns an object that allows the caller to indicate expected use
+func (m *MockInnerProducer) EXPECT() *MockInnerProducerMockRecorder {
+       return m.recorder
+}
+
+// PublishTopicList mocks base method
+func (m *MockInnerProducer) PublishTopicList() []string {
+       ret := m.ctrl.Call(m, "PublishTopicList")
+       ret0, _ := ret[0].([]string)
+       return ret0
+}
+
+// PublishTopicList indicates an expected call of PublishTopicList
+func (mr *MockInnerProducerMockRecorder) PublishTopicList() *gomock.Call {
+       return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, 
"PublishTopicList", reflect.TypeOf((*MockInnerProducer)(nil).PublishTopicList))
+}
+
+// UpdateTopicPublishInfo mocks base method
+func (m *MockInnerProducer) UpdateTopicPublishInfo(topic string, info 
*TopicPublishInfo) {
+       m.ctrl.Call(m, "UpdateTopicPublishInfo", topic, info)
+}
+
+// UpdateTopicPublishInfo indicates an expected call of UpdateTopicPublishInfo
+func (mr *MockInnerProducerMockRecorder) UpdateTopicPublishInfo(topic, info 
interface{}) *gomock.Call {
+       return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, 
"UpdateTopicPublishInfo", 
reflect.TypeOf((*MockInnerProducer)(nil).UpdateTopicPublishInfo), topic, info)
+}
+
+// IsPublishTopicNeedUpdate mocks base method
+func (m *MockInnerProducer) IsPublishTopicNeedUpdate(topic string) bool {
+       ret := m.ctrl.Call(m, "IsPublishTopicNeedUpdate", topic)
+       ret0, _ := ret[0].(bool)
+       return ret0
+}
+
+// IsPublishTopicNeedUpdate indicates an expected call of 
IsPublishTopicNeedUpdate
+func (mr *MockInnerProducerMockRecorder) IsPublishTopicNeedUpdate(topic 
interface{}) *gomock.Call {
+       return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, 
"IsPublishTopicNeedUpdate", 
reflect.TypeOf((*MockInnerProducer)(nil).IsPublishTopicNeedUpdate), topic)
+}
+
+// IsUnitMode mocks base method
+func (m *MockInnerProducer) IsUnitMode() bool {
+       ret := m.ctrl.Call(m, "IsUnitMode")
+       ret0, _ := ret[0].(bool)
+       return ret0
+}
+
+// IsUnitMode indicates an expected call of IsUnitMode
+func (mr *MockInnerProducerMockRecorder) IsUnitMode() *gomock.Call {
+       return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsUnitMode", 
reflect.TypeOf((*MockInnerProducer)(nil).IsUnitMode))
+}
+
+// MockInnerConsumer is a mock of InnerConsumer interface
+type MockInnerConsumer struct {
+       ctrl     *gomock.Controller
+       recorder *MockInnerConsumerMockRecorder
+}
+
+// MockInnerConsumerMockRecorder is the mock recorder for MockInnerConsumer
+type MockInnerConsumerMockRecorder struct {
+       mock *MockInnerConsumer
+}
+
+// NewMockInnerConsumer creates a new mock instance
+func NewMockInnerConsumer(ctrl *gomock.Controller) *MockInnerConsumer {
+       mock := &MockInnerConsumer{ctrl: ctrl}
+       mock.recorder = &MockInnerConsumerMockRecorder{mock}
+       return mock
+}
+
+// EXPECT returns an object that allows the caller to indicate expected use
+func (m *MockInnerConsumer) EXPECT() *MockInnerConsumerMockRecorder {
+       return m.recorder
+}
+
+// PersistConsumerOffset mocks base method
+func (m *MockInnerConsumer) PersistConsumerOffset() {
+       m.ctrl.Call(m, "PersistConsumerOffset")
+}
+
+// PersistConsumerOffset indicates an expected call of PersistConsumerOffset
+func (mr *MockInnerConsumerMockRecorder) PersistConsumerOffset() *gomock.Call {
+       return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, 
"PersistConsumerOffset", 
reflect.TypeOf((*MockInnerConsumer)(nil).PersistConsumerOffset))
+}
+
+// UpdateTopicSubscribeInfo mocks base method
+func (m *MockInnerConsumer) UpdateTopicSubscribeInfo(topic string, mqs 
[]*primitive.MessageQueue) {
+       m.ctrl.Call(m, "UpdateTopicSubscribeInfo", topic, mqs)
+}
+
+// UpdateTopicSubscribeInfo indicates an expected call of 
UpdateTopicSubscribeInfo
+func (mr *MockInnerConsumerMockRecorder) UpdateTopicSubscribeInfo(topic, mqs 
interface{}) *gomock.Call {
+       return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, 
"UpdateTopicSubscribeInfo", 
reflect.TypeOf((*MockInnerConsumer)(nil).UpdateTopicSubscribeInfo), topic, mqs)
+}
+
+// IsSubscribeTopicNeedUpdate mocks base method
+func (m *MockInnerConsumer) IsSubscribeTopicNeedUpdate(topic string) bool {
+       ret := m.ctrl.Call(m, "IsSubscribeTopicNeedUpdate", topic)
+       ret0, _ := ret[0].(bool)
+       return ret0
+}
+
+// IsSubscribeTopicNeedUpdate indicates an expected call of 
IsSubscribeTopicNeedUpdate
+func (mr *MockInnerConsumerMockRecorder) IsSubscribeTopicNeedUpdate(topic 
interface{}) *gomock.Call {
+       return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, 
"IsSubscribeTopicNeedUpdate", 
reflect.TypeOf((*MockInnerConsumer)(nil).IsSubscribeTopicNeedUpdate), topic)
+}
+
+// SubscriptionDataList mocks base method
+func (m *MockInnerConsumer) SubscriptionDataList() []*SubscriptionData {
+       ret := m.ctrl.Call(m, "SubscriptionDataList")
+       ret0, _ := ret[0].([]*SubscriptionData)
+       return ret0
+}
+
+// SubscriptionDataList indicates an expected call of SubscriptionDataList
+func (mr *MockInnerConsumerMockRecorder) SubscriptionDataList() *gomock.Call {
+       return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, 
"SubscriptionDataList", 
reflect.TypeOf((*MockInnerConsumer)(nil).SubscriptionDataList))
+}
+
+// Rebalance mocks base method
+func (m *MockInnerConsumer) Rebalance() {
+       m.ctrl.Call(m, "Rebalance")
+}
+
+// Rebalance indicates an expected call of Rebalance
+func (mr *MockInnerConsumerMockRecorder) Rebalance() *gomock.Call {
+       return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Rebalance", 
reflect.TypeOf((*MockInnerConsumer)(nil).Rebalance))
+}
+
+// IsUnitMode mocks base method
+func (m *MockInnerConsumer) IsUnitMode() bool {
+       ret := m.ctrl.Call(m, "IsUnitMode")
+       ret0, _ := ret[0].(bool)
+       return ret0
+}
+
+// IsUnitMode indicates an expected call of IsUnitMode
+func (mr *MockInnerConsumerMockRecorder) IsUnitMode() *gomock.Call {
+       return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsUnitMode", 
reflect.TypeOf((*MockInnerConsumer)(nil).IsUnitMode))
+}
+
+// MockRMQClient is a mock of RMQClient interface
+type MockRMQClient struct {
+       ctrl     *gomock.Controller
+       recorder *MockRMQClientMockRecorder
+}
+
+// MockRMQClientMockRecorder is the mock recorder for MockRMQClient
+type MockRMQClientMockRecorder struct {
+       mock *MockRMQClient
+}
+
+// NewMockRMQClient creates a new mock instance
+func NewMockRMQClient(ctrl *gomock.Controller) *MockRMQClient {
+       mock := &MockRMQClient{ctrl: ctrl}
+       mock.recorder = &MockRMQClientMockRecorder{mock}
+       return mock
+}
+
+// EXPECT returns an object that allows the caller to indicate expected use
+func (m *MockRMQClient) EXPECT() *MockRMQClientMockRecorder {
+       return m.recorder
+}
+
+// Start mocks base method
+func (m *MockRMQClient) Start() {
+       m.ctrl.Call(m, "Start")
+}
+
+// Start indicates an expected call of Start
+func (mr *MockRMQClientMockRecorder) Start() *gomock.Call {
+       return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Start", 
reflect.TypeOf((*MockRMQClient)(nil).Start))
+}
+
+// Shutdown mocks base method
+func (m *MockRMQClient) Shutdown() {
+       m.ctrl.Call(m, "Shutdown")
+}
+
+// Shutdown indicates an expected call of Shutdown
+func (mr *MockRMQClientMockRecorder) Shutdown() *gomock.Call {
+       return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Shutdown", 
reflect.TypeOf((*MockRMQClient)(nil).Shutdown))
+}
+
+// ClientID mocks base method
+func (m *MockRMQClient) ClientID() string {
+       ret := m.ctrl.Call(m, "ClientID")
+       ret0, _ := ret[0].(string)
+       return ret0
+}
+
+// ClientID indicates an expected call of ClientID
+func (mr *MockRMQClientMockRecorder) ClientID() *gomock.Call {
+       return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ClientID", 
reflect.TypeOf((*MockRMQClient)(nil).ClientID))
+}
+
+// RegisterProducer mocks base method
+func (m *MockRMQClient) RegisterProducer(group string, producer InnerProducer) 
{
+       m.ctrl.Call(m, "RegisterProducer", group, producer)
+}
+
+// RegisterProducer indicates an expected call of RegisterProducer
+func (mr *MockRMQClientMockRecorder) RegisterProducer(group, producer 
interface{}) *gomock.Call {
+       return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, 
"RegisterProducer", reflect.TypeOf((*MockRMQClient)(nil).RegisterProducer), 
group, producer)
+}
+
+// InvokeSync mocks base method
+func (m *MockRMQClient) InvokeSync(addr string, request 
*remote.RemotingCommand, timeoutMillis time.Duration) (*remote.RemotingCommand, 
error) {
+       ret := m.ctrl.Call(m, "InvokeSync", addr, request, timeoutMillis)
+       ret0, _ := ret[0].(*remote.RemotingCommand)
+       ret1, _ := ret[1].(error)
+       return ret0, ret1
+}
+
+// InvokeSync indicates an expected call of InvokeSync
+func (mr *MockRMQClientMockRecorder) InvokeSync(addr, request, timeoutMillis 
interface{}) *gomock.Call {
+       return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InvokeSync", 
reflect.TypeOf((*MockRMQClient)(nil).InvokeSync), addr, request, timeoutMillis)
+}
+
+// InvokeAsync mocks base method
+func (m *MockRMQClient) InvokeAsync(addr string, request 
*remote.RemotingCommand, timeoutMillis time.Duration, f 
func(*remote.RemotingCommand, error)) error {
+       ret := m.ctrl.Call(m, "InvokeAsync", addr, request, timeoutMillis, f)
+       ret0, _ := ret[0].(error)
+       return ret0
+}
+
+// InvokeAsync indicates an expected call of InvokeAsync
+func (mr *MockRMQClientMockRecorder) InvokeAsync(addr, request, timeoutMillis, 
f interface{}) *gomock.Call {
+       return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InvokeAsync", 
reflect.TypeOf((*MockRMQClient)(nil).InvokeAsync), addr, request, 
timeoutMillis, f)
+}
+
+// InvokeOneWay mocks base method
+func (m *MockRMQClient) InvokeOneWay(addr string, request 
*remote.RemotingCommand, timeoutMillis time.Duration) error {
+       ret := m.ctrl.Call(m, "InvokeOneWay", addr, request, timeoutMillis)
+       ret0, _ := ret[0].(error)
+       return ret0
+}
+
+// InvokeOneWay indicates an expected call of InvokeOneWay
+func (mr *MockRMQClientMockRecorder) InvokeOneWay(addr, request, timeoutMillis 
interface{}) *gomock.Call {
+       return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InvokeOneWay", 
reflect.TypeOf((*MockRMQClient)(nil).InvokeOneWay), addr, request, 
timeoutMillis)
+}
+
+// CheckClientInBroker mocks base method
+func (m *MockRMQClient) CheckClientInBroker() {
+       m.ctrl.Call(m, "CheckClientInBroker")
+}
+
+// CheckClientInBroker indicates an expected call of CheckClientInBroker
+func (mr *MockRMQClientMockRecorder) CheckClientInBroker() *gomock.Call {
+       return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, 
"CheckClientInBroker", 
reflect.TypeOf((*MockRMQClient)(nil).CheckClientInBroker))
+}
+
+// SendHeartbeatToAllBrokerWithLock mocks base method
+func (m *MockRMQClient) SendHeartbeatToAllBrokerWithLock() {
+       m.ctrl.Call(m, "SendHeartbeatToAllBrokerWithLock")
+}
+
+// SendHeartbeatToAllBrokerWithLock indicates an expected call of 
SendHeartbeatToAllBrokerWithLock
+func (mr *MockRMQClientMockRecorder) SendHeartbeatToAllBrokerWithLock() 
*gomock.Call {
+       return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, 
"SendHeartbeatToAllBrokerWithLock", 
reflect.TypeOf((*MockRMQClient)(nil).SendHeartbeatToAllBrokerWithLock))
+}
+
+// UpdateTopicRouteInfo mocks base method
+func (m *MockRMQClient) UpdateTopicRouteInfo() {
+       m.ctrl.Call(m, "UpdateTopicRouteInfo")
+}
+
+// UpdateTopicRouteInfo indicates an expected call of UpdateTopicRouteInfo
+func (mr *MockRMQClientMockRecorder) UpdateTopicRouteInfo() *gomock.Call {
+       return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, 
"UpdateTopicRouteInfo", 
reflect.TypeOf((*MockRMQClient)(nil).UpdateTopicRouteInfo))
+}
+
+// SendMessageAsync mocks base method
+func (m *MockRMQClient) SendMessageAsync(ctx context.Context, brokerAddrs, 
brokerName string, request *SendMessageRequest, msgs []*primitive.Message, f 
func(*primitive.SendResult)) error {
+       ret := m.ctrl.Call(m, "SendMessageAsync", ctx, brokerAddrs, brokerName, 
request, msgs, f)
+       ret0, _ := ret[0].(error)
+       return ret0
+}
+
+// SendMessageAsync indicates an expected call of SendMessageAsync
+func (mr *MockRMQClientMockRecorder) SendMessageAsync(ctx, brokerAddrs, 
brokerName, request, msgs, f interface{}) *gomock.Call {
+       return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, 
"SendMessageAsync", reflect.TypeOf((*MockRMQClient)(nil).SendMessageAsync), 
ctx, brokerAddrs, brokerName, request, msgs, f)
+}
+
+// SendMessageOneWay mocks base method
+func (m *MockRMQClient) SendMessageOneWay(ctx context.Context, brokerAddrs 
string, request *SendMessageRequest, msgs []*primitive.Message) 
(*primitive.SendResult, error) {
+       ret := m.ctrl.Call(m, "SendMessageOneWay", ctx, brokerAddrs, request, 
msgs)
+       ret0, _ := ret[0].(*primitive.SendResult)
+       ret1, _ := ret[1].(error)
+       return ret0, ret1
+}
+
+// SendMessageOneWay indicates an expected call of SendMessageOneWay
+func (mr *MockRMQClientMockRecorder) SendMessageOneWay(ctx, brokerAddrs, 
request, msgs interface{}) *gomock.Call {
+       return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, 
"SendMessageOneWay", reflect.TypeOf((*MockRMQClient)(nil).SendMessageOneWay), 
ctx, brokerAddrs, request, msgs)
+}
+
+// ProcessSendResponse mocks base method
+func (m *MockRMQClient) ProcessSendResponse(brokerName string, cmd 
*remote.RemotingCommand, resp *primitive.SendResult, msgs 
...*primitive.Message) {
+       varargs := []interface{}{brokerName, cmd, resp}
+       for _, a := range msgs {
+               varargs = append(varargs, a)
+       }
+       m.ctrl.Call(m, "ProcessSendResponse", varargs...)
+}
+
+// ProcessSendResponse indicates an expected call of ProcessSendResponse
+func (mr *MockRMQClientMockRecorder) ProcessSendResponse(brokerName, cmd, resp 
interface{}, msgs ...interface{}) *gomock.Call {
+       varargs := append([]interface{}{brokerName, cmd, resp}, msgs...)
+       return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, 
"ProcessSendResponse", 
reflect.TypeOf((*MockRMQClient)(nil).ProcessSendResponse), varargs...)
+}
+
+// RegisterConsumer mocks base method
+func (m *MockRMQClient) RegisterConsumer(group string, consumer InnerConsumer) 
error {
+       ret := m.ctrl.Call(m, "RegisterConsumer", group, consumer)
+       ret0, _ := ret[0].(error)
+       return ret0
+}
+
+// RegisterConsumer indicates an expected call of RegisterConsumer
+func (mr *MockRMQClientMockRecorder) RegisterConsumer(group, consumer 
interface{}) *gomock.Call {
+       return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, 
"RegisterConsumer", reflect.TypeOf((*MockRMQClient)(nil).RegisterConsumer), 
group, consumer)
+}
+
+// UnregisterConsumer mocks base method
+func (m *MockRMQClient) UnregisterConsumer(group string) {
+       m.ctrl.Call(m, "UnregisterConsumer", group)
+}
+
+// UnregisterConsumer indicates an expected call of UnregisterConsumer
+func (mr *MockRMQClientMockRecorder) UnregisterConsumer(group interface{}) 
*gomock.Call {
+       return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, 
"UnregisterConsumer", reflect.TypeOf((*MockRMQClient)(nil).UnregisterConsumer), 
group)
+}
+
+// PullMessage mocks base method
+func (m *MockRMQClient) PullMessage(ctx context.Context, brokerAddrs string, 
request *PullMessageRequest) (*primitive.PullResult, error) {
+       ret := m.ctrl.Call(m, "PullMessage", ctx, brokerAddrs, request)
+       ret0, _ := ret[0].(*primitive.PullResult)
+       ret1, _ := ret[1].(error)
+       return ret0, ret1
+}
+
+// PullMessage indicates an expected call of PullMessage
+func (mr *MockRMQClientMockRecorder) PullMessage(ctx, brokerAddrs, request 
interface{}) *gomock.Call {
+       return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PullMessage", 
reflect.TypeOf((*MockRMQClient)(nil).PullMessage), ctx, brokerAddrs, request)
+}
+
+// PullMessageAsync mocks base method
+func (m *MockRMQClient) PullMessageAsync(ctx context.Context, brokerAddrs 
string, request *PullMessageRequest, f func(*primitive.PullResult)) error {
+       ret := m.ctrl.Call(m, "PullMessageAsync", ctx, brokerAddrs, request, f)
+       ret0, _ := ret[0].(error)
+       return ret0
+}
+
+// PullMessageAsync indicates an expected call of PullMessageAsync
+func (mr *MockRMQClientMockRecorder) PullMessageAsync(ctx, brokerAddrs, 
request, f interface{}) *gomock.Call {
+       return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, 
"PullMessageAsync", reflect.TypeOf((*MockRMQClient)(nil).PullMessageAsync), 
ctx, brokerAddrs, request, f)
+}
+
+// RebalanceImmediately mocks base method
+func (m *MockRMQClient) RebalanceImmediately() {
+       m.ctrl.Call(m, "RebalanceImmediately")
+}
+
+// RebalanceImmediately indicates an expected call of RebalanceImmediately
+func (mr *MockRMQClientMockRecorder) RebalanceImmediately() *gomock.Call {
+       return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, 
"RebalanceImmediately", 
reflect.TypeOf((*MockRMQClient)(nil).RebalanceImmediately))
+}
+
+// UpdatePublishInfo mocks base method
+func (m *MockRMQClient) UpdatePublishInfo(topic string, data *TopicRouteData) {
+       m.ctrl.Call(m, "UpdatePublishInfo", topic, data)
+}
+
+// UpdatePublishInfo indicates an expected call of UpdatePublishInfo
+func (mr *MockRMQClientMockRecorder) UpdatePublishInfo(topic, data 
interface{}) *gomock.Call {
+       return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, 
"UpdatePublishInfo", reflect.TypeOf((*MockRMQClient)(nil).UpdatePublishInfo), 
topic, data)
+}
diff --git a/internal/route.go b/internal/route.go
index 074cf90..dbb39c1 100644
--- a/internal/route.go
+++ b/internal/route.go
@@ -157,6 +157,13 @@ func UpdateTopicRouteInfo(topic string) *TopicRouteData {
        return routeData.clone()
 }
 
+// just for test
+func AddBroker(routeData *TopicRouteData) {
+       for _, brokerData := range routeData.BrokerDataList {
+               brokerAddressesMap.Store(brokerData.BrokerName, brokerData)
+       }
+}
+
 func FindBrokerAddrByTopic(topic string) string {
        v, exist := routeDataMap.Load(topic)
        if !exist {
diff --git a/producer/producer.go b/producer/producer.go
index 47c4026..ab501de 100644
--- a/producer/producer.go
+++ b/producer/producer.go
@@ -33,6 +33,7 @@ import (
 var (
        ErrTopicEmpty   = errors.New("topic is nil")
        ErrMessageEmpty = errors.New("message is nil")
+       ErrNotRunning   = errors.New("producer not started")
 )
 
 func NewDefaultProducer(opts ...Option) (*defaultProducer, error) {
@@ -84,7 +85,7 @@ func getChainedInterceptor(interceptors 
[]primitive.Interceptor, cur int, finalI
 
 type defaultProducer struct {
        group       string
-       client      *internal.RMQClient
+       client      internal.RMQClient
        state       internal.ServiceState
        options     producerOptions
        publishInfo sync.Map
@@ -100,10 +101,16 @@ func (p *defaultProducer) Start() error {
 }
 
 func (p *defaultProducer) Shutdown() error {
+       p.state = internal.StateShutdown
+       p.client.Shutdown()
        return nil
 }
 
 func (p *defaultProducer) checkMsg(msg *primitive.Message) error {
+       if p.state != internal.StateRunning {
+               return ErrNotRunning
+       }
+
        if msg == nil {
                return errors.New("message is nil")
        }
@@ -242,6 +249,7 @@ func (p *defaultProducer) sendOneWay(ctx context.Context, 
msg *primitive.Message
                        err = _err
                        continue
                }
+               return nil
        }
        return err
 }
diff --git a/producer/producer_test.go b/producer/producer_test.go
new file mode 100644
index 0000000..0dd564f
--- /dev/null
+++ b/producer/producer_test.go
@@ -0,0 +1,234 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package producer
+
+import (
+       "context"
+       "testing"
+       "time"
+
+       "github.com/apache/rocketmq-client-go/internal"
+       "github.com/apache/rocketmq-client-go/internal/remote"
+       "github.com/apache/rocketmq-client-go/primitive"
+       "github.com/golang/mock/gomock"
+       "github.com/stretchr/testify/assert"
+)
+
+const (
+       topic = "TopicTest"
+)
+
+func TestShutdown(t *testing.T) {
+       p, _ := NewDefaultProducer(
+               WithNameServer([]string{"127.0.0.1:9876"}),
+               WithRetry(2),
+               WithQueueSelector(NewManualQueueSelector()),
+       )
+
+       ctrl := gomock.NewController(t)
+       defer ctrl.Finish()
+       client := internal.NewMockRMQClient(ctrl)
+       p.client = client
+
+       client.EXPECT().RegisterProducer(gomock.Any(), gomock.Any()).Return()
+       client.EXPECT().Start().Return()
+       err := p.Start()
+       assert.Nil(t, err)
+
+       client.EXPECT().Shutdown().Return()
+       err = p.Shutdown()
+       assert.Nil(t, err)
+
+       ctx := context.Background()
+       msg := new(primitive.Message)
+
+       r, err := p.SendSync(ctx, msg)
+       assert.Equal(t, ErrNotRunning, err)
+       assert.Nil(t, r)
+
+       err = p.SendOneWay(ctx, msg)
+       assert.Equal(t, ErrNotRunning, err)
+       assert.Nil(t, r)
+
+       f := func(context.Context, *primitive.SendResult, error) {
+               assert.False(t, true, "should not  come in")
+       }
+       err = p.SendAsync(ctx, f, msg)
+       assert.Equal(t, ErrNotRunning, err)
+       assert.Nil(t, r)
+}
+
+func mockB4Send(p *defaultProducer) {
+       p.publishInfo.Store(topic, &internal.TopicPublishInfo{
+               HaveTopicRouterInfo: true,
+               MqList: []*primitive.MessageQueue{
+                       {
+                               Topic:      topic,
+                               BrokerName: "aa",
+                               QueueId:    0,
+                       },
+               },
+       })
+       internal.AddBroker(&internal.TopicRouteData{
+               BrokerDataList: []*internal.BrokerData{
+                       {
+                               Cluster:    "cluster",
+                               BrokerName: "aa",
+                               BrokerAddresses: map[int64]string{
+                                       0: "1",
+                               },
+                       },
+               },
+       })
+}
+
+func TestSync(t *testing.T) {
+       p, _ := NewDefaultProducer(
+               WithNameServer([]string{"127.0.0.1:9876"}),
+               WithRetry(2),
+               WithQueueSelector(NewManualQueueSelector()),
+       )
+
+       ctrl := gomock.NewController(t)
+       defer ctrl.Finish()
+       client := internal.NewMockRMQClient(ctrl)
+       p.client = client
+
+       client.EXPECT().RegisterProducer(gomock.Any(), gomock.Any()).Return()
+       client.EXPECT().Start().Return()
+       err := p.Start()
+       assert.Nil(t, err)
+
+       ctx := context.Background()
+       msg := &primitive.Message{
+               Topic:      topic,
+               Body:       []byte("this is a message body"),
+               Properties: map[string]string{"key": "value"},
+               QueueID:    0,
+       }
+
+       expectedResp := &primitive.SendResult{
+               Status:      primitive.SendOK,
+               MsgID:       "111",
+               QueueOffset: 0,
+               OffsetMsgID: "0",
+       }
+
+       mockB4Send(p)
+
+       client.EXPECT().InvokeSync(gomock.Any(), gomock.Any(), 
gomock.Any()).Return(nil, nil)
+       client.EXPECT().ProcessSendResponse(gomock.Any(), gomock.Any(), 
gomock.Any(), gomock.Any()).Do(
+               func(brokerName string, cmd *remote.RemotingCommand, resp 
*primitive.SendResult, msgs ...*primitive.Message) {
+                       resp.Status = expectedResp.Status
+                       resp.MsgID = expectedResp.MsgID
+                       resp.QueueOffset = expectedResp.QueueOffset
+                       resp.OffsetMsgID = expectedResp.OffsetMsgID
+               })
+       resp, err := p.SendSync(ctx, msg)
+       assert.Nil(t, err)
+       assert.Equal(t, expectedResp, resp)
+}
+
+func TestASync(t *testing.T) {
+       p, _ := NewDefaultProducer(
+               WithNameServer([]string{"127.0.0.1:9876"}),
+               WithRetry(2),
+               WithQueueSelector(NewManualQueueSelector()),
+       )
+
+       ctrl := gomock.NewController(t)
+       defer ctrl.Finish()
+       client := internal.NewMockRMQClient(ctrl)
+       p.client = client
+
+       client.EXPECT().RegisterProducer(gomock.Any(), gomock.Any()).Return()
+       client.EXPECT().Start().Return()
+       err := p.Start()
+       assert.Nil(t, err)
+
+       ctx := context.Background()
+       msg := &primitive.Message{
+               Topic:      topic,
+               Body:       []byte("this is a message body"),
+               Properties: map[string]string{"key": "value"},
+       }
+
+       expectedResp := &primitive.SendResult{
+               Status:      primitive.SendOK,
+               MsgID:       "111",
+               QueueOffset: 0,
+               OffsetMsgID: "0",
+       }
+
+       f := func(ctx context.Context, resp *primitive.SendResult, err error) {
+               assert.Nil(t, err)
+               assert.Equal(t, expectedResp, resp)
+       }
+
+       mockB4Send(p)
+
+       client.EXPECT().InvokeAsync(gomock.Any(), gomock.Any(), gomock.Any(), 
gomock.Any()).DoAndReturn(
+               func(addr string, request *remote.RemotingCommand,
+                       timeoutMillis time.Duration, f 
func(*remote.RemotingCommand, error)) error {
+                       // mock invoke callback
+                       f(nil, nil)
+                       return nil
+               })
+       client.EXPECT().ProcessSendResponse(gomock.Any(), gomock.Any(), 
gomock.Any(), gomock.Any()).Do(
+               func(brokerName string, cmd *remote.RemotingCommand, resp 
*primitive.SendResult, msgs ...*primitive.Message) {
+                       resp.Status = expectedResp.Status
+                       resp.MsgID = expectedResp.MsgID
+                       resp.QueueOffset = expectedResp.QueueOffset
+                       resp.OffsetMsgID = expectedResp.OffsetMsgID
+               })
+
+       err = p.SendAsync(ctx, f, msg)
+       assert.Nil(t, err)
+}
+
+func TestOneway(t *testing.T) {
+       p, _ := NewDefaultProducer(
+               WithNameServer([]string{"127.0.0.1:9876"}),
+               WithRetry(2),
+               WithQueueSelector(NewManualQueueSelector()),
+       )
+
+       ctrl := gomock.NewController(t)
+       defer ctrl.Finish()
+       client := internal.NewMockRMQClient(ctrl)
+       p.client = client
+
+       client.EXPECT().RegisterProducer(gomock.Any(), gomock.Any()).Return()
+       client.EXPECT().Start().Return()
+       err := p.Start()
+       assert.Nil(t, err)
+
+       ctx := context.Background()
+       msg := &primitive.Message{
+               Topic:      topic,
+               Body:       []byte("this is a message body"),
+               Properties: map[string]string{"key": "value"},
+       }
+
+       mockB4Send(p)
+
+       client.EXPECT().InvokeOneWay(gomock.Any(), gomock.Any(), 
gomock.Any()).Return(nil).AnyTimes()
+
+       err = p.SendOneWay(ctx, msg)
+       assert.Nil(t, err)
+}
diff --git a/producer/selector_test.go b/producer/selector_test.go
index 016db07..723c55b 100644
--- a/producer/selector_test.go
+++ b/producer/selector_test.go
@@ -20,6 +20,7 @@ package producer
 import (
        "testing"
 
+       "github.com/apache/rocketmq-client-go/primitive"
        "github.com/stretchr/testify/assert"
 )
 
@@ -27,10 +28,10 @@ func TestRoundRobin(t *testing.T) {
        queues := 10
        s := NewRoundRobinQueueSelector()
 
-       m := &Message{
+       m := &primitive.Message{
                Topic: "test",
        }
-       mrr := &Message{
+       mrr := &primitive.Message{
                Topic: "rr",
        }
        for i := 0; i < 100; i++ {

Reply via email to