This is an automated email from the ASF dual-hosted git repository.
huzongtang pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/native by this push:
new 95f5b0d [ISSUE #108] Add producer unit test (#109)
95f5b0d is described below
commit 95f5b0dd1ce2eaba64e1e1185c5407ef49bc5264
Author: xujianhai666 <[email protected]>
AuthorDate: Wed Jul 10 11:17:09 2019 +0800
[ISSUE #108] Add producer unit test (#109)
* add producer unit test. resolve #108
* clean line
---
consumer/consumer.go | 2 +-
consumer/offset_store.go | 4 +-
consumer/pull_consumer.go | 2 +-
examples/producer/interceptor/main.go | 1 -
go.mod | 2 +-
go.sum | 5 +-
internal/client.go | 106 ++++++---
internal/client_test.go | 2 +-
internal/mock_client.go | 411 ++++++++++++++++++++++++++++++++++
internal/route.go | 7 +
producer/producer.go | 10 +-
producer/producer_test.go | 234 +++++++++++++++++++
producer/selector_test.go | 5 +-
13 files changed, 743 insertions(+), 48 deletions(-)
diff --git a/consumer/consumer.go b/consumer/consumer.go
index a02602a..5bd9674 100644
--- a/consumer/consumer.go
+++ b/consumer/consumer.go
@@ -225,7 +225,7 @@ type defaultConsumer struct {
fromWhere ConsumeFromWhere
cType ConsumeType
- client *internal.RMQClient
+ client internal.RMQClient
mqChanged func(topic string, mqAll, mqDivided []*primitive.MessageQueue)
state internal.ServiceState
pause bool
diff --git a/consumer/offset_store.go b/consumer/offset_store.go
index 5c55e27..45d80f2 100644
--- a/consumer/offset_store.go
+++ b/consumer/offset_store.go
@@ -180,11 +180,11 @@ func (local *localFileOffsetStore) remove(mq
*primitive.MessageQueue) {
type remoteBrokerOffsetStore struct {
group string
OffsetTable map[string]map[int]*queueOffset `json:"OffsetTable"`
- client *internal.RMQClient
+ client internal.RMQClient
mutex sync.RWMutex
}
-func NewRemoteOffsetStore(group string, client *internal.RMQClient)
OffsetStore {
+func NewRemoteOffsetStore(group string, client internal.RMQClient) OffsetStore
{
return &remoteBrokerOffsetStore{
group: group,
client: client,
diff --git a/consumer/pull_consumer.go b/consumer/pull_consumer.go
index 32cbaed..c8bcdc5 100644
--- a/consumer/pull_consumer.go
+++ b/consumer/pull_consumer.go
@@ -41,7 +41,7 @@ var (
type defaultPullConsumer struct {
state internal.ServiceState
option consumerOptions
- client *internal.RMQClient
+ client internal.RMQClient
GroupName string
Model MessageModel
UnitMode bool
diff --git a/examples/producer/interceptor/main.go
b/examples/producer/interceptor/main.go
index 4822de9..643d2a8 100644
--- a/examples/producer/interceptor/main.go
+++ b/examples/producer/interceptor/main.go
@@ -42,7 +42,6 @@ func main() {
}
for i := 0; i < 10; i++ {
res, err := p.SendSync(context.Background(), &primitive.Message{
- //Topic: "test",
Topic: "TopicTest",
Body: []byte("Hello RocketMQ Go Client!"),
Properties: map[string]string{"order": strconv.Itoa(i)},
diff --git a/go.mod b/go.mod
index 684a7d2..322c847 100644
--- a/go.mod
+++ b/go.mod
@@ -4,11 +4,11 @@ go 1.12
require (
github.com/emirpasic/gods v1.12.0
+ github.com/golang/mock v1.3.1
github.com/pkg/errors v0.8.1
github.com/sirupsen/logrus v1.4.1
github.com/stretchr/testify v1.3.0
github.com/tidwall/gjson v1.2.1
github.com/tidwall/match v1.0.1 // indirect
github.com/tidwall/pretty v0.0.0-20190325153808-1166b9ac2b65 // indirect
- golang.org/x/tools v0.0.0-20190606050223-4d9ae51c2468 // indirect
)
diff --git a/go.sum b/go.sum
index 3b0271b..06d0d34 100644
--- a/go.sum
+++ b/go.sum
@@ -3,6 +3,8 @@ github.com/davecgh/go-spew v1.1.1
h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
github.com/davecgh/go-spew v1.1.1/go.mod
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/emirpasic/gods v1.12.0
h1:QAUIPSaCu4G+POclxeqb3F+WPpdKqFGlw36+yOzGlrg=
github.com/emirpasic/gods v1.12.0/go.mod
h1:YfzfFFoVP/catgzJb4IKIqXjX78Ha8FMSDh3ymbK86o=
+github.com/golang/mock v1.3.1 h1:qGJ6qTW+x6xX/my+8YUVl4WNpX9B7+/l2tRsHGZ7f2s=
+github.com/golang/mock v1.3.1/go.mod
h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFUx0Y=
github.com/konsorten/go-windows-terminal-sequences v1.0.1
h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod
h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
@@ -30,5 +32,4 @@ golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod
h1:STP8DvDyc/dI5b8T5h
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a
h1:1BGLXjeY4akVXGgbC9HugT3Jv3hCI0z56oJR5vAMgBU=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod
h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
-golang.org/x/tools v0.0.0-20190606050223-4d9ae51c2468
h1:fTfk6GjmihJbK0mSUFgPPgYpsdmApQ86Mcd4GuKax9U=
-golang.org/x/tools v0.0.0-20190606050223-4d9ae51c2468/go.mod
h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc=
+golang.org/x/tools v0.0.0-20190425150028-36563e24a262/go.mod
h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
diff --git a/internal/client.go b/internal/client.go
index a6e1bc5..1db43c7 100644
--- a/internal/client.go
+++ b/internal/client.go
@@ -118,7 +118,41 @@ func (opt *ClientOptions) String() string {
opt.InstanceName, opt.UnitMode, opt.UnitName,
opt.VIPChannelEnabled, opt.ACLEnabled)
}
-type RMQClient struct {
+//go:generate mockgen -source client.go -destination mock_client.go --package
internal RMQClient
+type RMQClient interface {
+ Start()
+ Shutdown()
+
+ ClientID() string
+
+ RegisterProducer(group string, producer InnerProducer)
+ InvokeSync(addr string, request *remote.RemotingCommand,
+ timeoutMillis time.Duration) (*remote.RemotingCommand, error)
+ InvokeAsync(addr string, request *remote.RemotingCommand,
+ timeoutMillis time.Duration, f func(*remote.RemotingCommand,
error)) error
+ InvokeOneWay(addr string, request *remote.RemotingCommand,
+ timeoutMillis time.Duration) error
+ CheckClientInBroker()
+ SendHeartbeatToAllBrokerWithLock()
+ UpdateTopicRouteInfo()
+ SendMessageAsync(ctx context.Context, brokerAddrs, brokerName string,
request *SendMessageRequest,
+ msgs []*primitive.Message, f func(result
*primitive.SendResult)) error
+ SendMessageOneWay(ctx context.Context, brokerAddrs string, request
*SendMessageRequest,
+ msgs []*primitive.Message) (*primitive.SendResult, error)
+
+ ProcessSendResponse(brokerName string, cmd *remote.RemotingCommand,
resp *primitive.SendResult, msgs ...*primitive.Message)
+
+ RegisterConsumer(group string, consumer InnerConsumer) error
+ UnregisterConsumer(group string)
+ PullMessage(ctx context.Context, brokerAddrs string, request
*PullMessageRequest) (*primitive.PullResult, error)
+ PullMessageAsync(ctx context.Context, brokerAddrs string, request
*PullMessageRequest, f func(result *primitive.PullResult)) error
+ RebalanceImmediately()
+ UpdatePublishInfo(topic string, data *TopicRouteData)
+}
+
+var _ RMQClient = new(rmqClient)
+
+type rmqClient struct {
option ClientOptions
// group -> InnerProducer
producerMap sync.Map
@@ -134,8 +168,8 @@ type RMQClient struct {
var clientMap sync.Map
-func GetOrNewRocketMQClient(option ClientOptions) *RMQClient {
- client := &RMQClient{
+func GetOrNewRocketMQClient(option ClientOptions) *rmqClient {
+ client := &rmqClient{
option: option,
remoteClient: remote.NewRemotingClient(),
}
@@ -147,10 +181,10 @@ func GetOrNewRocketMQClient(option ClientOptions)
*RMQClient {
return nil
})
}
- return actual.(*RMQClient)
+ return actual.(*rmqClient)
}
-func (c *RMQClient) Start() {
+func (c *rmqClient) Start() {
//ctx, cancel := context.WithCancel(context.Background())
//c.cancel = cancel
c.close = false
@@ -162,7 +196,7 @@ func (c *RMQClient) Start() {
go func() {
// delay
time.Sleep(50 * time.Millisecond)
- for !c.close{
+ for !c.close {
c.UpdateTopicRouteInfo()
time.Sleep(_PullNameServerInterval)
}
@@ -170,7 +204,7 @@ func (c *RMQClient) Start() {
// TODO cleanOfflineBroker & sendHeartbeatToAllBrokerWithLock
go func() {
- for !c.close{
+ for !c.close {
cleanOfflineBroker()
c.SendHeartbeatToAllBrokerWithLock()
time.Sleep(_HeartbeatBrokerInterval)
@@ -180,7 +214,7 @@ func (c *RMQClient) Start() {
// schedule persist offset
go func() {
//time.Sleep(10 * time.Second)
- for !c.close{
+ for !c.close {
c.consumerMap.Range(func(key, value
interface{}) bool {
consumer := value.(InnerConsumer)
consumer.PersistConsumerOffset()
@@ -191,7 +225,7 @@ func (c *RMQClient) Start() {
}()
go func() {
- for !c.close{
+ for !c.close {
c.RebalanceImmediately()
time.Sleep(_RebalanceInterval)
}
@@ -199,12 +233,12 @@ func (c *RMQClient) Start() {
})
}
-func (c *RMQClient) Shutdown() {
+func (c *rmqClient) Shutdown() {
c.remoteClient.ShutDown()
c.close = true
}
-func (c *RMQClient) ClientID() string {
+func (c *rmqClient) ClientID() string {
id := c.option.ClientIP + "@" + c.option.InstanceName
if c.option.UnitName != "" {
id += "@" + c.option.UnitName
@@ -212,7 +246,7 @@ func (c *RMQClient) ClientID() string {
return id
}
-func (c *RMQClient) InvokeSync(addr string, request *remote.RemotingCommand,
+func (c *rmqClient) InvokeSync(addr string, request *remote.RemotingCommand,
timeoutMillis time.Duration) (*remote.RemotingCommand, error) {
if c.close {
return nil, ErrServiceState
@@ -220,7 +254,7 @@ func (c *RMQClient) InvokeSync(addr string, request
*remote.RemotingCommand,
return c.remoteClient.InvokeSync(addr, request, timeoutMillis)
}
-func (c *RMQClient) InvokeAsync(addr string, request *remote.RemotingCommand,
+func (c *rmqClient) InvokeAsync(addr string, request *remote.RemotingCommand,
timeoutMillis time.Duration, f func(*remote.RemotingCommand, error))
error {
if c.close {
return ErrServiceState
@@ -231,7 +265,7 @@ func (c *RMQClient) InvokeAsync(addr string, request
*remote.RemotingCommand,
}
-func (c *RMQClient) InvokeOneWay(addr string, request *remote.RemotingCommand,
+func (c *rmqClient) InvokeOneWay(addr string, request *remote.RemotingCommand,
timeoutMillis time.Duration) error {
if c.close {
return ErrServiceState
@@ -239,11 +273,11 @@ func (c *RMQClient) InvokeOneWay(addr string, request
*remote.RemotingCommand,
return c.remoteClient.InvokeOneWay(addr, request, timeoutMillis)
}
-func (c *RMQClient) CheckClientInBroker() {
+func (c *rmqClient) CheckClientInBroker() {
}
// TODO
-func (c *RMQClient) SendHeartbeatToAllBrokerWithLock() {
+func (c *rmqClient) SendHeartbeatToAllBrokerWithLock() {
c.hbMutex.Lock()
defer c.hbMutex.Unlock()
hbData := &heartbeatData{
@@ -301,7 +335,7 @@ func (c *RMQClient) SendHeartbeatToAllBrokerWithLock() {
})
}
-func (c *RMQClient) UpdateTopicRouteInfo() {
+func (c *rmqClient) UpdateTopicRouteInfo() {
publishTopicSet := make(map[string]bool, 0)
c.producerMap.Range(func(key, value interface{}) bool {
producer := value.(InnerProducer)
@@ -328,17 +362,17 @@ func (c *RMQClient) UpdateTopicRouteInfo() {
})
for topic := range subscribedTopicSet {
- c.UpdateSubscribeInfo(topic, UpdateTopicRouteInfo(topic))
+ c.updateSubscribeInfo(topic, UpdateTopicRouteInfo(topic))
}
}
// SendMessageAsync send message with batch by async
-func (c *RMQClient) SendMessageAsync(ctx context.Context, brokerAddrs,
brokerName string, request *SendMessageRequest,
+func (c *rmqClient) SendMessageAsync(ctx context.Context, brokerAddrs,
brokerName string, request *SendMessageRequest,
msgs []*primitive.Message, f func(result *primitive.SendResult)) error {
return nil
}
-func (c *RMQClient) SendMessageOneWay(ctx context.Context, brokerAddrs string,
request *SendMessageRequest,
+func (c *rmqClient) SendMessageOneWay(ctx context.Context, brokerAddrs string,
request *SendMessageRequest,
msgs []*primitive.Message) (*primitive.SendResult, error) {
cmd := remote.NewRemotingCommand(ReqSendBatchMessage, request,
encodeMessages(msgs))
err := c.remoteClient.InvokeOneWay(brokerAddrs, cmd, 3*time.Second)
@@ -348,7 +382,7 @@ func (c *RMQClient) SendMessageOneWay(ctx context.Context,
brokerAddrs string, r
return nil, err
}
-func (c *RMQClient) ProcessSendResponse(brokerName string, cmd
*remote.RemotingCommand, resp *primitive.SendResult, msgs
...*primitive.Message) {
+func (c *rmqClient) ProcessSendResponse(brokerName string, cmd
*remote.RemotingCommand, resp *primitive.SendResult, msgs
...*primitive.Message) {
var status primitive.SendStatus
switch cmd.Code {
case ResFlushDiskTimeout:
@@ -394,7 +428,7 @@ func (c *RMQClient) ProcessSendResponse(brokerName string,
cmd *remote.RemotingC
}
// PullMessage with sync
-func (c *RMQClient) PullMessage(ctx context.Context, brokerAddrs string,
request *PullMessageRequest) (*primitive.PullResult, error) {
+func (c *rmqClient) PullMessage(ctx context.Context, brokerAddrs string,
request *PullMessageRequest) (*primitive.PullResult, error) {
cmd := remote.NewRemotingCommand(ReqPullMessage, request, nil)
res, err := c.remoteClient.InvokeSync(brokerAddrs, cmd, 3*time.Second)
if err != nil {
@@ -404,7 +438,7 @@ func (c *RMQClient) PullMessage(ctx context.Context,
brokerAddrs string, request
return c.processPullResponse(res)
}
-func (c *RMQClient) processPullResponse(response *remote.RemotingCommand)
(*primitive.PullResult, error) {
+func (c *rmqClient) processPullResponse(response *remote.RemotingCommand)
(*primitive.PullResult, error) {
pullResult := &primitive.PullResult{}
switch response.Code {
@@ -426,7 +460,7 @@ func (c *RMQClient) processPullResponse(response
*remote.RemotingCommand) (*prim
return pullResult, nil
}
-func (c *RMQClient) decodeCommandCustomHeader(pr *primitive.PullResult, cmd
*remote.RemotingCommand) {
+func (c *rmqClient) decodeCommandCustomHeader(pr *primitive.PullResult, cmd
*remote.RemotingCommand) {
v, exist := cmd.ExtFields["maxOffset"]
if exist {
pr.MaxOffset, _ = strconv.ParseInt(v, 10, 64)
@@ -449,34 +483,34 @@ func (c *RMQClient) decodeCommandCustomHeader(pr
*primitive.PullResult, cmd *rem
}
// PullMessageAsync pull message async
-func (c *RMQClient) PullMessageAsync(ctx context.Context, brokerAddrs string,
request *PullMessageRequest, f func(result *primitive.PullResult)) error {
+func (c *rmqClient) PullMessageAsync(ctx context.Context, brokerAddrs string,
request *PullMessageRequest, f func(result *primitive.PullResult)) error {
return nil
}
-func (c *RMQClient) RegisterConsumer(group string, consumer InnerConsumer)
error {
+func (c *rmqClient) RegisterConsumer(group string, consumer InnerConsumer)
error {
c.consumerMap.Store(group, consumer)
return nil
}
-func (c *RMQClient) UnregisterConsumer(group string) {
+func (c *rmqClient) UnregisterConsumer(group string) {
}
-func (c *RMQClient) RegisterProducer(group string, producer InnerProducer) {
+func (c *rmqClient) RegisterProducer(group string, producer InnerProducer) {
c.producerMap.Store(group, producer)
}
-func (c *RMQClient) UnregisterProducer(group string) {
+func (c *rmqClient) UnregisterProducer(group string) {
}
-func (c *RMQClient) SelectProducer(group string) InnerProducer {
+func (c *rmqClient) SelectProducer(group string) InnerProducer {
return nil
}
-func (c *RMQClient) SelectConsumer(group string) InnerConsumer {
+func (c *rmqClient) SelectConsumer(group string) InnerConsumer {
return nil
}
-func (c *RMQClient) RebalanceImmediately() {
+func (c *rmqClient) RebalanceImmediately() {
c.consumerMap.Range(func(key, value interface{}) bool {
consumer := value.(InnerConsumer)
consumer.Rebalance()
@@ -484,7 +518,7 @@ func (c *RMQClient) RebalanceImmediately() {
})
}
-func (c *RMQClient) UpdatePublishInfo(topic string, data *TopicRouteData) {
+func (c *rmqClient) UpdatePublishInfo(topic string, data *TopicRouteData) {
if data == nil {
return
}
@@ -500,7 +534,7 @@ func (c *RMQClient) UpdatePublishInfo(topic string, data
*TopicRouteData) {
})
}
-func (c *RMQClient) isNeedUpdatePublishInfo(topic string) bool {
+func (c *rmqClient) isNeedUpdatePublishInfo(topic string) bool {
var result bool
c.producerMap.Range(func(key, value interface{}) bool {
p := value.(InnerProducer)
@@ -513,7 +547,7 @@ func (c *RMQClient) isNeedUpdatePublishInfo(topic string)
bool {
return result
}
-func (c *RMQClient) UpdateSubscribeInfo(topic string, data *TopicRouteData) {
+func (c *rmqClient) updateSubscribeInfo(topic string, data *TopicRouteData) {
if data == nil {
return
}
@@ -528,7 +562,7 @@ func (c *RMQClient) UpdateSubscribeInfo(topic string, data
*TopicRouteData) {
})
}
-func (c *RMQClient) isNeedUpdateSubscribeInfo(topic string) bool {
+func (c *rmqClient) isNeedUpdateSubscribeInfo(topic string) bool {
var result bool
c.consumerMap.Range(func(key, value interface{}) bool {
consumer := value.(InnerConsumer)
diff --git a/internal/client_test.go b/internal/client_test.go
index aafbca7..7814ddd 100644
--- a/internal/client_test.go
+++ b/internal/client_test.go
@@ -23,7 +23,7 @@ import (
)
func TestRMQClient_PullMessage(t *testing.T) {
- client := GetOrNewRocketMQClient(ClientOption{})
+ client := GetOrNewRocketMQClient(ClientOptions{})
req := &PullMessageRequest{
ConsumerGroup: "testGroup",
Topic: "wenfeng",
diff --git a/internal/mock_client.go b/internal/mock_client.go
new file mode 100644
index 0000000..3ee3f50
--- /dev/null
+++ b/internal/mock_client.go
@@ -0,0 +1,411 @@
+// Code generated by MockGen. DO NOT EDIT.
+// Source: client.go
+
+// Package internal is a generated GoMock package.
+package internal
+
+import (
+ context "context"
+ remote "github.com/apache/rocketmq-client-go/internal/remote"
+ primitive "github.com/apache/rocketmq-client-go/primitive"
+ gomock "github.com/golang/mock/gomock"
+ reflect "reflect"
+ time "time"
+)
+
+// MockInnerProducer is a mock of InnerProducer interface
+type MockInnerProducer struct {
+ ctrl *gomock.Controller
+ recorder *MockInnerProducerMockRecorder
+}
+
+// MockInnerProducerMockRecorder is the mock recorder for MockInnerProducer
+type MockInnerProducerMockRecorder struct {
+ mock *MockInnerProducer
+}
+
+// NewMockInnerProducer creates a new mock instance
+func NewMockInnerProducer(ctrl *gomock.Controller) *MockInnerProducer {
+ mock := &MockInnerProducer{ctrl: ctrl}
+ mock.recorder = &MockInnerProducerMockRecorder{mock}
+ return mock
+}
+
+// EXPECT returns an object that allows the caller to indicate expected use
+func (m *MockInnerProducer) EXPECT() *MockInnerProducerMockRecorder {
+ return m.recorder
+}
+
+// PublishTopicList mocks base method
+func (m *MockInnerProducer) PublishTopicList() []string {
+ ret := m.ctrl.Call(m, "PublishTopicList")
+ ret0, _ := ret[0].([]string)
+ return ret0
+}
+
+// PublishTopicList indicates an expected call of PublishTopicList
+func (mr *MockInnerProducerMockRecorder) PublishTopicList() *gomock.Call {
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock,
"PublishTopicList", reflect.TypeOf((*MockInnerProducer)(nil).PublishTopicList))
+}
+
+// UpdateTopicPublishInfo mocks base method
+func (m *MockInnerProducer) UpdateTopicPublishInfo(topic string, info
*TopicPublishInfo) {
+ m.ctrl.Call(m, "UpdateTopicPublishInfo", topic, info)
+}
+
+// UpdateTopicPublishInfo indicates an expected call of UpdateTopicPublishInfo
+func (mr *MockInnerProducerMockRecorder) UpdateTopicPublishInfo(topic, info
interface{}) *gomock.Call {
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock,
"UpdateTopicPublishInfo",
reflect.TypeOf((*MockInnerProducer)(nil).UpdateTopicPublishInfo), topic, info)
+}
+
+// IsPublishTopicNeedUpdate mocks base method
+func (m *MockInnerProducer) IsPublishTopicNeedUpdate(topic string) bool {
+ ret := m.ctrl.Call(m, "IsPublishTopicNeedUpdate", topic)
+ ret0, _ := ret[0].(bool)
+ return ret0
+}
+
+// IsPublishTopicNeedUpdate indicates an expected call of
IsPublishTopicNeedUpdate
+func (mr *MockInnerProducerMockRecorder) IsPublishTopicNeedUpdate(topic
interface{}) *gomock.Call {
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock,
"IsPublishTopicNeedUpdate",
reflect.TypeOf((*MockInnerProducer)(nil).IsPublishTopicNeedUpdate), topic)
+}
+
+// IsUnitMode mocks base method
+func (m *MockInnerProducer) IsUnitMode() bool {
+ ret := m.ctrl.Call(m, "IsUnitMode")
+ ret0, _ := ret[0].(bool)
+ return ret0
+}
+
+// IsUnitMode indicates an expected call of IsUnitMode
+func (mr *MockInnerProducerMockRecorder) IsUnitMode() *gomock.Call {
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsUnitMode",
reflect.TypeOf((*MockInnerProducer)(nil).IsUnitMode))
+}
+
+// MockInnerConsumer is a mock of InnerConsumer interface
+type MockInnerConsumer struct {
+ ctrl *gomock.Controller
+ recorder *MockInnerConsumerMockRecorder
+}
+
+// MockInnerConsumerMockRecorder is the mock recorder for MockInnerConsumer
+type MockInnerConsumerMockRecorder struct {
+ mock *MockInnerConsumer
+}
+
+// NewMockInnerConsumer creates a new mock instance
+func NewMockInnerConsumer(ctrl *gomock.Controller) *MockInnerConsumer {
+ mock := &MockInnerConsumer{ctrl: ctrl}
+ mock.recorder = &MockInnerConsumerMockRecorder{mock}
+ return mock
+}
+
+// EXPECT returns an object that allows the caller to indicate expected use
+func (m *MockInnerConsumer) EXPECT() *MockInnerConsumerMockRecorder {
+ return m.recorder
+}
+
+// PersistConsumerOffset mocks base method
+func (m *MockInnerConsumer) PersistConsumerOffset() {
+ m.ctrl.Call(m, "PersistConsumerOffset")
+}
+
+// PersistConsumerOffset indicates an expected call of PersistConsumerOffset
+func (mr *MockInnerConsumerMockRecorder) PersistConsumerOffset() *gomock.Call {
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock,
"PersistConsumerOffset",
reflect.TypeOf((*MockInnerConsumer)(nil).PersistConsumerOffset))
+}
+
+// UpdateTopicSubscribeInfo mocks base method
+func (m *MockInnerConsumer) UpdateTopicSubscribeInfo(topic string, mqs
[]*primitive.MessageQueue) {
+ m.ctrl.Call(m, "UpdateTopicSubscribeInfo", topic, mqs)
+}
+
+// UpdateTopicSubscribeInfo indicates an expected call of
UpdateTopicSubscribeInfo
+func (mr *MockInnerConsumerMockRecorder) UpdateTopicSubscribeInfo(topic, mqs
interface{}) *gomock.Call {
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock,
"UpdateTopicSubscribeInfo",
reflect.TypeOf((*MockInnerConsumer)(nil).UpdateTopicSubscribeInfo), topic, mqs)
+}
+
+// IsSubscribeTopicNeedUpdate mocks base method
+func (m *MockInnerConsumer) IsSubscribeTopicNeedUpdate(topic string) bool {
+ ret := m.ctrl.Call(m, "IsSubscribeTopicNeedUpdate", topic)
+ ret0, _ := ret[0].(bool)
+ return ret0
+}
+
+// IsSubscribeTopicNeedUpdate indicates an expected call of
IsSubscribeTopicNeedUpdate
+func (mr *MockInnerConsumerMockRecorder) IsSubscribeTopicNeedUpdate(topic
interface{}) *gomock.Call {
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock,
"IsSubscribeTopicNeedUpdate",
reflect.TypeOf((*MockInnerConsumer)(nil).IsSubscribeTopicNeedUpdate), topic)
+}
+
+// SubscriptionDataList mocks base method
+func (m *MockInnerConsumer) SubscriptionDataList() []*SubscriptionData {
+ ret := m.ctrl.Call(m, "SubscriptionDataList")
+ ret0, _ := ret[0].([]*SubscriptionData)
+ return ret0
+}
+
+// SubscriptionDataList indicates an expected call of SubscriptionDataList
+func (mr *MockInnerConsumerMockRecorder) SubscriptionDataList() *gomock.Call {
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock,
"SubscriptionDataList",
reflect.TypeOf((*MockInnerConsumer)(nil).SubscriptionDataList))
+}
+
+// Rebalance mocks base method
+func (m *MockInnerConsumer) Rebalance() {
+ m.ctrl.Call(m, "Rebalance")
+}
+
+// Rebalance indicates an expected call of Rebalance
+func (mr *MockInnerConsumerMockRecorder) Rebalance() *gomock.Call {
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Rebalance",
reflect.TypeOf((*MockInnerConsumer)(nil).Rebalance))
+}
+
+// IsUnitMode mocks base method
+func (m *MockInnerConsumer) IsUnitMode() bool {
+ ret := m.ctrl.Call(m, "IsUnitMode")
+ ret0, _ := ret[0].(bool)
+ return ret0
+}
+
+// IsUnitMode indicates an expected call of IsUnitMode
+func (mr *MockInnerConsumerMockRecorder) IsUnitMode() *gomock.Call {
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsUnitMode",
reflect.TypeOf((*MockInnerConsumer)(nil).IsUnitMode))
+}
+
+// MockRMQClient is a mock of RMQClient interface
+type MockRMQClient struct {
+ ctrl *gomock.Controller
+ recorder *MockRMQClientMockRecorder
+}
+
+// MockRMQClientMockRecorder is the mock recorder for MockRMQClient
+type MockRMQClientMockRecorder struct {
+ mock *MockRMQClient
+}
+
+// NewMockRMQClient creates a new mock instance
+func NewMockRMQClient(ctrl *gomock.Controller) *MockRMQClient {
+ mock := &MockRMQClient{ctrl: ctrl}
+ mock.recorder = &MockRMQClientMockRecorder{mock}
+ return mock
+}
+
+// EXPECT returns an object that allows the caller to indicate expected use
+func (m *MockRMQClient) EXPECT() *MockRMQClientMockRecorder {
+ return m.recorder
+}
+
+// Start mocks base method
+func (m *MockRMQClient) Start() {
+ m.ctrl.Call(m, "Start")
+}
+
+// Start indicates an expected call of Start
+func (mr *MockRMQClientMockRecorder) Start() *gomock.Call {
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Start",
reflect.TypeOf((*MockRMQClient)(nil).Start))
+}
+
+// Shutdown mocks base method
+func (m *MockRMQClient) Shutdown() {
+ m.ctrl.Call(m, "Shutdown")
+}
+
+// Shutdown indicates an expected call of Shutdown
+func (mr *MockRMQClientMockRecorder) Shutdown() *gomock.Call {
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Shutdown",
reflect.TypeOf((*MockRMQClient)(nil).Shutdown))
+}
+
+// ClientID mocks base method
+func (m *MockRMQClient) ClientID() string {
+ ret := m.ctrl.Call(m, "ClientID")
+ ret0, _ := ret[0].(string)
+ return ret0
+}
+
+// ClientID indicates an expected call of ClientID
+func (mr *MockRMQClientMockRecorder) ClientID() *gomock.Call {
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ClientID",
reflect.TypeOf((*MockRMQClient)(nil).ClientID))
+}
+
+// RegisterProducer mocks base method
+func (m *MockRMQClient) RegisterProducer(group string, producer InnerProducer)
{
+ m.ctrl.Call(m, "RegisterProducer", group, producer)
+}
+
+// RegisterProducer indicates an expected call of RegisterProducer
+func (mr *MockRMQClientMockRecorder) RegisterProducer(group, producer
interface{}) *gomock.Call {
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock,
"RegisterProducer", reflect.TypeOf((*MockRMQClient)(nil).RegisterProducer),
group, producer)
+}
+
+// InvokeSync mocks base method
+func (m *MockRMQClient) InvokeSync(addr string, request
*remote.RemotingCommand, timeoutMillis time.Duration) (*remote.RemotingCommand,
error) {
+ ret := m.ctrl.Call(m, "InvokeSync", addr, request, timeoutMillis)
+ ret0, _ := ret[0].(*remote.RemotingCommand)
+ ret1, _ := ret[1].(error)
+ return ret0, ret1
+}
+
+// InvokeSync indicates an expected call of InvokeSync
+func (mr *MockRMQClientMockRecorder) InvokeSync(addr, request, timeoutMillis
interface{}) *gomock.Call {
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InvokeSync",
reflect.TypeOf((*MockRMQClient)(nil).InvokeSync), addr, request, timeoutMillis)
+}
+
+// InvokeAsync mocks base method
+func (m *MockRMQClient) InvokeAsync(addr string, request
*remote.RemotingCommand, timeoutMillis time.Duration, f
func(*remote.RemotingCommand, error)) error {
+ ret := m.ctrl.Call(m, "InvokeAsync", addr, request, timeoutMillis, f)
+ ret0, _ := ret[0].(error)
+ return ret0
+}
+
+// InvokeAsync indicates an expected call of InvokeAsync
+func (mr *MockRMQClientMockRecorder) InvokeAsync(addr, request, timeoutMillis,
f interface{}) *gomock.Call {
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InvokeAsync",
reflect.TypeOf((*MockRMQClient)(nil).InvokeAsync), addr, request,
timeoutMillis, f)
+}
+
+// InvokeOneWay mocks base method
+func (m *MockRMQClient) InvokeOneWay(addr string, request
*remote.RemotingCommand, timeoutMillis time.Duration) error {
+ ret := m.ctrl.Call(m, "InvokeOneWay", addr, request, timeoutMillis)
+ ret0, _ := ret[0].(error)
+ return ret0
+}
+
+// InvokeOneWay indicates an expected call of InvokeOneWay
+func (mr *MockRMQClientMockRecorder) InvokeOneWay(addr, request, timeoutMillis
interface{}) *gomock.Call {
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InvokeOneWay",
reflect.TypeOf((*MockRMQClient)(nil).InvokeOneWay), addr, request,
timeoutMillis)
+}
+
+// CheckClientInBroker mocks base method
+func (m *MockRMQClient) CheckClientInBroker() {
+ m.ctrl.Call(m, "CheckClientInBroker")
+}
+
+// CheckClientInBroker indicates an expected call of CheckClientInBroker
+func (mr *MockRMQClientMockRecorder) CheckClientInBroker() *gomock.Call {
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock,
"CheckClientInBroker",
reflect.TypeOf((*MockRMQClient)(nil).CheckClientInBroker))
+}
+
+// SendHeartbeatToAllBrokerWithLock mocks base method
+func (m *MockRMQClient) SendHeartbeatToAllBrokerWithLock() {
+ m.ctrl.Call(m, "SendHeartbeatToAllBrokerWithLock")
+}
+
+// SendHeartbeatToAllBrokerWithLock indicates an expected call of
SendHeartbeatToAllBrokerWithLock
+func (mr *MockRMQClientMockRecorder) SendHeartbeatToAllBrokerWithLock()
*gomock.Call {
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock,
"SendHeartbeatToAllBrokerWithLock",
reflect.TypeOf((*MockRMQClient)(nil).SendHeartbeatToAllBrokerWithLock))
+}
+
+// UpdateTopicRouteInfo mocks base method
+func (m *MockRMQClient) UpdateTopicRouteInfo() {
+ m.ctrl.Call(m, "UpdateTopicRouteInfo")
+}
+
+// UpdateTopicRouteInfo indicates an expected call of UpdateTopicRouteInfo
+func (mr *MockRMQClientMockRecorder) UpdateTopicRouteInfo() *gomock.Call {
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock,
"UpdateTopicRouteInfo",
reflect.TypeOf((*MockRMQClient)(nil).UpdateTopicRouteInfo))
+}
+
+// SendMessageAsync mocks base method
+func (m *MockRMQClient) SendMessageAsync(ctx context.Context, brokerAddrs,
brokerName string, request *SendMessageRequest, msgs []*primitive.Message, f
func(*primitive.SendResult)) error {
+ ret := m.ctrl.Call(m, "SendMessageAsync", ctx, brokerAddrs, brokerName,
request, msgs, f)
+ ret0, _ := ret[0].(error)
+ return ret0
+}
+
+// SendMessageAsync indicates an expected call of SendMessageAsync
+func (mr *MockRMQClientMockRecorder) SendMessageAsync(ctx, brokerAddrs,
brokerName, request, msgs, f interface{}) *gomock.Call {
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock,
"SendMessageAsync", reflect.TypeOf((*MockRMQClient)(nil).SendMessageAsync),
ctx, brokerAddrs, brokerName, request, msgs, f)
+}
+
+// SendMessageOneWay mocks base method
+func (m *MockRMQClient) SendMessageOneWay(ctx context.Context, brokerAddrs
string, request *SendMessageRequest, msgs []*primitive.Message)
(*primitive.SendResult, error) {
+ ret := m.ctrl.Call(m, "SendMessageOneWay", ctx, brokerAddrs, request,
msgs)
+ ret0, _ := ret[0].(*primitive.SendResult)
+ ret1, _ := ret[1].(error)
+ return ret0, ret1
+}
+
+// SendMessageOneWay indicates an expected call of SendMessageOneWay
+func (mr *MockRMQClientMockRecorder) SendMessageOneWay(ctx, brokerAddrs,
request, msgs interface{}) *gomock.Call {
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock,
"SendMessageOneWay", reflect.TypeOf((*MockRMQClient)(nil).SendMessageOneWay),
ctx, brokerAddrs, request, msgs)
+}
+
+// ProcessSendResponse mocks base method
+func (m *MockRMQClient) ProcessSendResponse(brokerName string, cmd
*remote.RemotingCommand, resp *primitive.SendResult, msgs
...*primitive.Message) {
+ varargs := []interface{}{brokerName, cmd, resp}
+ for _, a := range msgs {
+ varargs = append(varargs, a)
+ }
+ m.ctrl.Call(m, "ProcessSendResponse", varargs...)
+}
+
+// ProcessSendResponse indicates an expected call of ProcessSendResponse
+func (mr *MockRMQClientMockRecorder) ProcessSendResponse(brokerName, cmd, resp
interface{}, msgs ...interface{}) *gomock.Call {
+ varargs := append([]interface{}{brokerName, cmd, resp}, msgs...)
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock,
"ProcessSendResponse",
reflect.TypeOf((*MockRMQClient)(nil).ProcessSendResponse), varargs...)
+}
+
+// RegisterConsumer mocks base method
+func (m *MockRMQClient) RegisterConsumer(group string, consumer InnerConsumer)
error {
+ ret := m.ctrl.Call(m, "RegisterConsumer", group, consumer)
+ ret0, _ := ret[0].(error)
+ return ret0
+}
+
+// RegisterConsumer indicates an expected call of RegisterConsumer
+func (mr *MockRMQClientMockRecorder) RegisterConsumer(group, consumer
interface{}) *gomock.Call {
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock,
"RegisterConsumer", reflect.TypeOf((*MockRMQClient)(nil).RegisterConsumer),
group, consumer)
+}
+
+// UnregisterConsumer mocks base method
+func (m *MockRMQClient) UnregisterConsumer(group string) {
+ m.ctrl.Call(m, "UnregisterConsumer", group)
+}
+
+// UnregisterConsumer indicates an expected call of UnregisterConsumer
+func (mr *MockRMQClientMockRecorder) UnregisterConsumer(group interface{})
*gomock.Call {
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock,
"UnregisterConsumer", reflect.TypeOf((*MockRMQClient)(nil).UnregisterConsumer),
group)
+}
+
+// PullMessage mocks base method
+func (m *MockRMQClient) PullMessage(ctx context.Context, brokerAddrs string,
request *PullMessageRequest) (*primitive.PullResult, error) {
+ ret := m.ctrl.Call(m, "PullMessage", ctx, brokerAddrs, request)
+ ret0, _ := ret[0].(*primitive.PullResult)
+ ret1, _ := ret[1].(error)
+ return ret0, ret1
+}
+
+// PullMessage indicates an expected call of PullMessage
+func (mr *MockRMQClientMockRecorder) PullMessage(ctx, brokerAddrs, request
interface{}) *gomock.Call {
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PullMessage",
reflect.TypeOf((*MockRMQClient)(nil).PullMessage), ctx, brokerAddrs, request)
+}
+
+// PullMessageAsync mocks base method
+func (m *MockRMQClient) PullMessageAsync(ctx context.Context, brokerAddrs
string, request *PullMessageRequest, f func(*primitive.PullResult)) error {
+ ret := m.ctrl.Call(m, "PullMessageAsync", ctx, brokerAddrs, request, f)
+ ret0, _ := ret[0].(error)
+ return ret0
+}
+
+// PullMessageAsync indicates an expected call of PullMessageAsync
+func (mr *MockRMQClientMockRecorder) PullMessageAsync(ctx, brokerAddrs,
request, f interface{}) *gomock.Call {
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock,
"PullMessageAsync", reflect.TypeOf((*MockRMQClient)(nil).PullMessageAsync),
ctx, brokerAddrs, request, f)
+}
+
+// RebalanceImmediately mocks base method
+func (m *MockRMQClient) RebalanceImmediately() {
+ m.ctrl.Call(m, "RebalanceImmediately")
+}
+
+// RebalanceImmediately indicates an expected call of RebalanceImmediately
+func (mr *MockRMQClientMockRecorder) RebalanceImmediately() *gomock.Call {
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock,
"RebalanceImmediately",
reflect.TypeOf((*MockRMQClient)(nil).RebalanceImmediately))
+}
+
+// UpdatePublishInfo mocks base method
+func (m *MockRMQClient) UpdatePublishInfo(topic string, data *TopicRouteData) {
+ m.ctrl.Call(m, "UpdatePublishInfo", topic, data)
+}
+
+// UpdatePublishInfo indicates an expected call of UpdatePublishInfo
+func (mr *MockRMQClientMockRecorder) UpdatePublishInfo(topic, data
interface{}) *gomock.Call {
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock,
"UpdatePublishInfo", reflect.TypeOf((*MockRMQClient)(nil).UpdatePublishInfo),
topic, data)
+}
diff --git a/internal/route.go b/internal/route.go
index 074cf90..dbb39c1 100644
--- a/internal/route.go
+++ b/internal/route.go
@@ -157,6 +157,13 @@ func UpdateTopicRouteInfo(topic string) *TopicRouteData {
return routeData.clone()
}
+// just for test
+func AddBroker(routeData *TopicRouteData) {
+ for _, brokerData := range routeData.BrokerDataList {
+ brokerAddressesMap.Store(brokerData.BrokerName, brokerData)
+ }
+}
+
func FindBrokerAddrByTopic(topic string) string {
v, exist := routeDataMap.Load(topic)
if !exist {
diff --git a/producer/producer.go b/producer/producer.go
index 47c4026..ab501de 100644
--- a/producer/producer.go
+++ b/producer/producer.go
@@ -33,6 +33,7 @@ import (
var (
ErrTopicEmpty = errors.New("topic is nil")
ErrMessageEmpty = errors.New("message is nil")
+ ErrNotRunning = errors.New("producer not started")
)
func NewDefaultProducer(opts ...Option) (*defaultProducer, error) {
@@ -84,7 +85,7 @@ func getChainedInterceptor(interceptors
[]primitive.Interceptor, cur int, finalI
type defaultProducer struct {
group string
- client *internal.RMQClient
+ client internal.RMQClient
state internal.ServiceState
options producerOptions
publishInfo sync.Map
@@ -100,10 +101,16 @@ func (p *defaultProducer) Start() error {
}
func (p *defaultProducer) Shutdown() error {
+ p.state = internal.StateShutdown
+ p.client.Shutdown()
return nil
}
func (p *defaultProducer) checkMsg(msg *primitive.Message) error {
+ if p.state != internal.StateRunning {
+ return ErrNotRunning
+ }
+
if msg == nil {
return errors.New("message is nil")
}
@@ -242,6 +249,7 @@ func (p *defaultProducer) sendOneWay(ctx context.Context,
msg *primitive.Message
err = _err
continue
}
+ return nil
}
return err
}
diff --git a/producer/producer_test.go b/producer/producer_test.go
new file mode 100644
index 0000000..0dd564f
--- /dev/null
+++ b/producer/producer_test.go
@@ -0,0 +1,234 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package producer
+
+import (
+ "context"
+ "testing"
+ "time"
+
+ "github.com/apache/rocketmq-client-go/internal"
+ "github.com/apache/rocketmq-client-go/internal/remote"
+ "github.com/apache/rocketmq-client-go/primitive"
+ "github.com/golang/mock/gomock"
+ "github.com/stretchr/testify/assert"
+)
+
+const (
+ topic = "TopicTest"
+)
+
+func TestShutdown(t *testing.T) {
+ p, _ := NewDefaultProducer(
+ WithNameServer([]string{"127.0.0.1:9876"}),
+ WithRetry(2),
+ WithQueueSelector(NewManualQueueSelector()),
+ )
+
+ ctrl := gomock.NewController(t)
+ defer ctrl.Finish()
+ client := internal.NewMockRMQClient(ctrl)
+ p.client = client
+
+ client.EXPECT().RegisterProducer(gomock.Any(), gomock.Any()).Return()
+ client.EXPECT().Start().Return()
+ err := p.Start()
+ assert.Nil(t, err)
+
+ client.EXPECT().Shutdown().Return()
+ err = p.Shutdown()
+ assert.Nil(t, err)
+
+ ctx := context.Background()
+ msg := new(primitive.Message)
+
+ r, err := p.SendSync(ctx, msg)
+ assert.Equal(t, ErrNotRunning, err)
+ assert.Nil(t, r)
+
+ err = p.SendOneWay(ctx, msg)
+ assert.Equal(t, ErrNotRunning, err)
+ assert.Nil(t, r)
+
+ f := func(context.Context, *primitive.SendResult, error) {
+ assert.False(t, true, "should not come in")
+ }
+ err = p.SendAsync(ctx, f, msg)
+ assert.Equal(t, ErrNotRunning, err)
+ assert.Nil(t, r)
+}
+
+func mockB4Send(p *defaultProducer) {
+ p.publishInfo.Store(topic, &internal.TopicPublishInfo{
+ HaveTopicRouterInfo: true,
+ MqList: []*primitive.MessageQueue{
+ {
+ Topic: topic,
+ BrokerName: "aa",
+ QueueId: 0,
+ },
+ },
+ })
+ internal.AddBroker(&internal.TopicRouteData{
+ BrokerDataList: []*internal.BrokerData{
+ {
+ Cluster: "cluster",
+ BrokerName: "aa",
+ BrokerAddresses: map[int64]string{
+ 0: "1",
+ },
+ },
+ },
+ })
+}
+
+func TestSync(t *testing.T) {
+ p, _ := NewDefaultProducer(
+ WithNameServer([]string{"127.0.0.1:9876"}),
+ WithRetry(2),
+ WithQueueSelector(NewManualQueueSelector()),
+ )
+
+ ctrl := gomock.NewController(t)
+ defer ctrl.Finish()
+ client := internal.NewMockRMQClient(ctrl)
+ p.client = client
+
+ client.EXPECT().RegisterProducer(gomock.Any(), gomock.Any()).Return()
+ client.EXPECT().Start().Return()
+ err := p.Start()
+ assert.Nil(t, err)
+
+ ctx := context.Background()
+ msg := &primitive.Message{
+ Topic: topic,
+ Body: []byte("this is a message body"),
+ Properties: map[string]string{"key": "value"},
+ QueueID: 0,
+ }
+
+ expectedResp := &primitive.SendResult{
+ Status: primitive.SendOK,
+ MsgID: "111",
+ QueueOffset: 0,
+ OffsetMsgID: "0",
+ }
+
+ mockB4Send(p)
+
+ client.EXPECT().InvokeSync(gomock.Any(), gomock.Any(),
gomock.Any()).Return(nil, nil)
+ client.EXPECT().ProcessSendResponse(gomock.Any(), gomock.Any(),
gomock.Any(), gomock.Any()).Do(
+ func(brokerName string, cmd *remote.RemotingCommand, resp
*primitive.SendResult, msgs ...*primitive.Message) {
+ resp.Status = expectedResp.Status
+ resp.MsgID = expectedResp.MsgID
+ resp.QueueOffset = expectedResp.QueueOffset
+ resp.OffsetMsgID = expectedResp.OffsetMsgID
+ })
+ resp, err := p.SendSync(ctx, msg)
+ assert.Nil(t, err)
+ assert.Equal(t, expectedResp, resp)
+}
+
+func TestASync(t *testing.T) {
+ p, _ := NewDefaultProducer(
+ WithNameServer([]string{"127.0.0.1:9876"}),
+ WithRetry(2),
+ WithQueueSelector(NewManualQueueSelector()),
+ )
+
+ ctrl := gomock.NewController(t)
+ defer ctrl.Finish()
+ client := internal.NewMockRMQClient(ctrl)
+ p.client = client
+
+ client.EXPECT().RegisterProducer(gomock.Any(), gomock.Any()).Return()
+ client.EXPECT().Start().Return()
+ err := p.Start()
+ assert.Nil(t, err)
+
+ ctx := context.Background()
+ msg := &primitive.Message{
+ Topic: topic,
+ Body: []byte("this is a message body"),
+ Properties: map[string]string{"key": "value"},
+ }
+
+ expectedResp := &primitive.SendResult{
+ Status: primitive.SendOK,
+ MsgID: "111",
+ QueueOffset: 0,
+ OffsetMsgID: "0",
+ }
+
+ f := func(ctx context.Context, resp *primitive.SendResult, err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, expectedResp, resp)
+ }
+
+ mockB4Send(p)
+
+ client.EXPECT().InvokeAsync(gomock.Any(), gomock.Any(), gomock.Any(),
gomock.Any()).DoAndReturn(
+ func(addr string, request *remote.RemotingCommand,
+ timeoutMillis time.Duration, f
func(*remote.RemotingCommand, error)) error {
+ // mock invoke callback
+ f(nil, nil)
+ return nil
+ })
+ client.EXPECT().ProcessSendResponse(gomock.Any(), gomock.Any(),
gomock.Any(), gomock.Any()).Do(
+ func(brokerName string, cmd *remote.RemotingCommand, resp
*primitive.SendResult, msgs ...*primitive.Message) {
+ resp.Status = expectedResp.Status
+ resp.MsgID = expectedResp.MsgID
+ resp.QueueOffset = expectedResp.QueueOffset
+ resp.OffsetMsgID = expectedResp.OffsetMsgID
+ })
+
+ err = p.SendAsync(ctx, f, msg)
+ assert.Nil(t, err)
+}
+
+func TestOneway(t *testing.T) {
+ p, _ := NewDefaultProducer(
+ WithNameServer([]string{"127.0.0.1:9876"}),
+ WithRetry(2),
+ WithQueueSelector(NewManualQueueSelector()),
+ )
+
+ ctrl := gomock.NewController(t)
+ defer ctrl.Finish()
+ client := internal.NewMockRMQClient(ctrl)
+ p.client = client
+
+ client.EXPECT().RegisterProducer(gomock.Any(), gomock.Any()).Return()
+ client.EXPECT().Start().Return()
+ err := p.Start()
+ assert.Nil(t, err)
+
+ ctx := context.Background()
+ msg := &primitive.Message{
+ Topic: topic,
+ Body: []byte("this is a message body"),
+ Properties: map[string]string{"key": "value"},
+ }
+
+ mockB4Send(p)
+
+ client.EXPECT().InvokeOneWay(gomock.Any(), gomock.Any(),
gomock.Any()).Return(nil).AnyTimes()
+
+ err = p.SendOneWay(ctx, msg)
+ assert.Nil(t, err)
+}
diff --git a/producer/selector_test.go b/producer/selector_test.go
index 016db07..723c55b 100644
--- a/producer/selector_test.go
+++ b/producer/selector_test.go
@@ -20,6 +20,7 @@ package producer
import (
"testing"
+ "github.com/apache/rocketmq-client-go/primitive"
"github.com/stretchr/testify/assert"
)
@@ -27,10 +28,10 @@ func TestRoundRobin(t *testing.T) {
queues := 10
s := NewRoundRobinQueueSelector()
- m := &Message{
+ m := &primitive.Message{
Topic: "test",
}
- mrr := &Message{
+ mrr := &primitive.Message{
Topic: "rr",
}
for i := 0; i < 100; i++ {