This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/native by this push:
new f14b760 Refactoring Producer (#58)
f14b760 is described below
commit f14b7607bf1ef1fcc8cd5f9fb97418b1cbd8c95d
Author: wenfeng <[email protected]>
AuthorDate: Fri May 10 13:51:56 2019 +0800
Refactoring Producer (#58)
* refactor producer
* fix msg decode bug
* make name srv configurable
---
common/CommonConstant.go | 2 +-
consumer/consumer.go | 2 +
consumer/push_consumer.go | 22 +++-
examples/consumer/main.go | 16 +--
examples/{consumer => producer}/main.go | 45 +++----
go.mod | 13 +-
go.sum | 16 +--
kernel/client.go | 92 ++++++++-------
kernel/model.go | 4 +-
kernel/request.go | 39 +++---
kernel/route.go | 16 +--
producer.go | 189 -----------------------------
producer/producer.go | 203 ++++++++++++++++++++++++++++++++
producer_test.go | 12 --
utils/{messagesysflag.go => fun.go} | 28 -----
utils/messagesysflag.go | 6 +-
16 files changed, 344 insertions(+), 361 deletions(-)
diff --git a/common/CommonConstant.go b/common/CommonConstant.go
index bd4e7ce..754114c 100644
--- a/common/CommonConstant.go
+++ b/common/CommonConstant.go
@@ -5,4 +5,4 @@ const (
SYNC
ASYNC
ONEWAY
-)
\ No newline at end of file
+)
diff --git a/consumer/consumer.go b/consumer/consumer.go
index a6d9500..91da8cf 100644
--- a/consumer/consumer.go
+++ b/consumer/consumer.go
@@ -194,6 +194,8 @@ func (pr *PullRequest) String() string {
type ConsumerOption struct {
kernel.ClientOption
+ NameServerAddr string
+
/**
* Backtracking consumption time with second precision. Time format is
* 20131223171201<br>
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index 2039be1..abc087e 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -24,6 +24,7 @@ import (
"github.com/apache/rocketmq-client-go/rlog"
"github.com/apache/rocketmq-client-go/utils"
"math"
+ "os"
"strconv"
"time"
)
@@ -63,6 +64,13 @@ type pushConsumer struct {
func NewPushConsumer(consumerGroup string, opt ConsumerOption) PushConsumer {
opt.InstanceName = "DEFAULT"
opt.ClientIP = utils.LocalIP()
+ if opt.NameServerAddr == "" {
+ rlog.Fatal("opt.NameServerAddr can't be empty")
+ }
+ err := os.Setenv(kernel.EnvNameServerAddr, opt.NameServerAddr)
+ if err != nil {
+ rlog.Fatal("set env=EnvNameServerAddr error: %s ", err.Error())
+ }
dc := &defaultConsumer{
consumerGroup: consumerGroup,
cType: _PushConsume,
@@ -451,7 +459,7 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) {
pq.putMessage(msgFounded...)
}
if result.NextBeginOffset < prevRequestOffset ||
firstMsgOffset < prevRequestOffset {
- rlog.Warnf("[BUG] pull message result maybe
data wrong, [nextBeginOffset=%s, "+
+ rlog.Warnf("[BUG] pull message result maybe
data wrong, [nextBeginOffset=%d, "+
"firstMsgOffset=%d,
prevRequestOffset=%d]", result.NextBeginOffset, firstMsgOffset,
prevRequestOffset)
}
case kernel.PullNoNewMsg:
@@ -607,12 +615,14 @@ func (pc *pushConsumer) consumeMessageCurrently(pq
*processQueue, mq *kernel.Mes
groupTopic := kernel.RetryGroupTopicPrefix +
pc.consumerGroup
for idx := range subMsgs {
msg := subMsgs[idx]
- retryTopic :=
msg.Properties[kernel.PropertyRetryTopic]
- if retryTopic == "" && groupTopic == msg.Topic {
- msg.Topic = retryTopic
+ if msg.Properties != nil {
+ retryTopic :=
msg.Properties[kernel.PropertyRetryTopic]
+ if retryTopic == "" && groupTopic ==
msg.Topic {
+ msg.Topic = retryTopic
+ }
+
subMsgs[idx].Properties[kernel.PropertyConsumeStartTime] = strconv.FormatInt(
+
beginTime.UnixNano()/int64(time.Millisecond), 10)
}
-
subMsgs[idx].Properties[kernel.PropertyConsumeStartTime] = strconv.FormatInt(
-
beginTime.UnixNano()/int64(time.Millisecond), 10)
}
result, err := pc.consume(ctx, subMsgs)
consumeRT := time.Now().Sub(beginTime)
diff --git a/examples/consumer/main.go b/examples/consumer/main.go
index 2b563ff..38e780e 100644
--- a/examples/consumer/main.go
+++ b/examples/consumer/main.go
@@ -22,22 +22,22 @@ import (
"github.com/apache/rocketmq-client-go/consumer"
"github.com/apache/rocketmq-client-go/kernel"
"os"
- "sync/atomic"
"time"
)
func main() {
c := consumer.NewPushConsumer("testGroup", consumer.ConsumerOption{
- ConsumerModel: consumer.Clustering,
- FromWhere: consumer.ConsumeFromFirstOffset,
+ NameServerAddr: "127.0.0.1:9876",
+ ConsumerModel: consumer.Clustering,
+ FromWhere: consumer.ConsumeFromFirstOffset,
})
- var count int64
+ //var count int64
err := c.Subscribe("test", consumer.MessageSelector{}, func(ctx
*consumer.ConsumeMessageContext,
msgs []*kernel.MessageExt) (consumer.ConsumeResult, error) {
- c := atomic.AddInt64(&count, int64(len(msgs)))
- if c%1000 == 0 {
- fmt.Println(c)
- }
+ //c := atomic.AddInt64(&count, int64(len(msgs)))
+ //if c%1000 == 0 {
+ fmt.Println(msgs)
+ //}
return consumer.ConsumeSuccess, nil
})
if err != nil {
diff --git a/examples/consumer/main.go b/examples/producer/main.go
similarity index 57%
copy from examples/consumer/main.go
copy to examples/producer/main.go
index 2b563ff..83bd127 100644
--- a/examples/consumer/main.go
+++ b/examples/producer/main.go
@@ -18,35 +18,38 @@ limitations under the License.
package main
import (
+ "context"
"fmt"
- "github.com/apache/rocketmq-client-go/consumer"
"github.com/apache/rocketmq-client-go/kernel"
+ "github.com/apache/rocketmq-client-go/producer"
"os"
- "sync/atomic"
- "time"
)
func main() {
- c := consumer.NewPushConsumer("testGroup", consumer.ConsumerOption{
- ConsumerModel: consumer.Clustering,
- FromWhere: consumer.ConsumeFromFirstOffset,
- })
- var count int64
- err := c.Subscribe("test", consumer.MessageSelector{}, func(ctx
*consumer.ConsumeMessageContext,
- msgs []*kernel.MessageExt) (consumer.ConsumeResult, error) {
- c := atomic.AddInt64(&count, int64(len(msgs)))
- if c%1000 == 0 {
- fmt.Println(c)
- }
- return consumer.ConsumeSuccess, nil
- })
+ opt := producer.ProducerOptions{
+ NameServerAddr: "127.0.0.1:9876",
+ RetryTimesWhenSendFailed: 2,
+ }
+ p := producer.NewProducer(opt)
+ err := p.Start()
if err != nil {
- fmt.Println(err.Error())
+ fmt.Printf("start producer error: %s", err.Error())
+ os.Exit(1)
+ }
+ for i := 0; i < 1000; i++ {
+ res, err := p.SendSync(context.Background(), &kernel.Message{
+ Topic: "test",
+ Body: []byte("Hello RocketMQ Go Client!"),
+ })
+
+ if err != nil {
+ fmt.Printf("send message error: %s\n", err)
+ } else {
+ fmt.Printf("send message success: result=%s\n",
res.String())
+ }
}
- err = c.Start()
+ err = p.Shutdown()
if err != nil {
- fmt.Println(err.Error())
- os.Exit(-1)
+ fmt.Printf("shundown producer error: %s", err.Error())
}
- time.Sleep(time.Hour)
}
diff --git a/go.mod b/go.mod
index 419d8ba..f8a9967 100644
--- a/go.mod
+++ b/go.mod
@@ -1,19 +1,12 @@
module github.com/apache/rocketmq-client-go
-go 1.11
+go 1.12
require (
github.com/emirpasic/gods v1.12.0
- github.com/sirupsen/logrus v1.3.0
+ github.com/sirupsen/logrus v1.4.1
github.com/stretchr/testify v1.3.0
github.com/tidwall/gjson v1.2.1
github.com/tidwall/match v1.0.1 // indirect
- github.com/tidwall/pretty v0.0.0-20180105212114-65a9db5fad51 // indirect
-)
-
-replace (
- golang.org/x/crypto v0.0.0-20180904163835-0709b304e793 =>
github.com/golang/crypto v0.0.0-20180904163835-0709b304e793
- golang.org/x/net v0.0.0-20180821023952-922f4815f713 =>
github.com/golang/net v0.0.0-20180826012351-8a410e7b638d
- golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33 =>
github.com/golang/sys v0.0.0-20180905080454-ebe1bf3edb33
- golang.org/x/text v0.3.0 => github.com/golang/text v0.3.0
+ github.com/tidwall/pretty v0.0.0-20190325153808-1166b9ac2b65 // indirect
)
diff --git a/go.sum b/go.sum
index 61c5e19..07969a4 100644
--- a/go.sum
+++ b/go.sum
@@ -1,28 +1,20 @@
github.com/davecgh/go-spew v1.1.0/go.mod
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
-github.com/davecgh/go-spew v1.1.1
h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
-github.com/golang/crypto v0.0.0-20180904163835-0709b304e793/go.mod
h1:uZvAcrsnNaCxlh1HorK5dUQHGmEKPh2H/Rl1kehswPo=
-github.com/golang/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod
h1:5JyrLPvD/ZdaYkT7IqKhsP5xt7aLjA99KXRtk4EIYDk=
github.com/emirpasic/gods v1.12.0
h1:QAUIPSaCu4G+POclxeqb3F+WPpdKqFGlw36+yOzGlrg=
github.com/emirpasic/gods v1.12.0/go.mod
h1:YfzfFFoVP/catgzJb4IKIqXjX78Ha8FMSDh3ymbK86o=
-github.com/konsorten/go-windows-terminal-sequences v1.0.1
h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod
h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
-github.com/pmezard/go-difflib v1.0.0
h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod
h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
-github.com/sirupsen/logrus v1.3.0
h1:hI/7Q+DtNZ2kINb6qt/lS+IyXnHQe9e90POfeewL/ME=
-github.com/sirupsen/logrus v1.3.0/go.mod
h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
+github.com/sirupsen/logrus v1.4.1
h1:GL2rEmy6nsikmW0r8opw9JIRScdMF5hA8cOYLH7In1k=
+github.com/sirupsen/logrus v1.4.1/go.mod
h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q=
github.com/stretchr/objx v0.1.0/go.mod
h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod
h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2/go.mod
h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
-github.com/stretchr/testify v1.3.0
h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
github.com/stretchr/testify v1.3.0/go.mod
h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/tidwall/gjson v1.2.1 h1:j0efZLrZUvNerEf6xqoi0NjWMK5YlLrR7Guo/dxY174=
github.com/tidwall/gjson v1.2.1/go.mod
h1:c/nTNbUr0E0OrXEhq1pwa8iEgc2DOt4ZZqAt1HtCkPA=
github.com/tidwall/match v1.0.1 h1:PnKP62LPNxHKTwvHHZZzdOAOCtsJTjo6dZLCwpKm5xc=
github.com/tidwall/match v1.0.1/go.mod
h1:LujAq0jyVjBy028G1WhWfIzbpQfMO8bBZ6Tyb0+pL9E=
-github.com/tidwall/pretty v0.0.0-20180105212114-65a9db5fad51
h1:BP2bjP495BBPaBcS5rmqviTfrOkN5rO5ceKAMRZCRFc=
-github.com/tidwall/pretty v0.0.0-20180105212114-65a9db5fad51/go.mod
h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
-golang.org/x/crypto v0.0.0-20180904163835-0709b304e793
h1:u+LnwYTOOW7Ukr/fppxEb1Nwz0AtPflrblfvUudpo+I=
-golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod
h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
+github.com/tidwall/pretty v0.0.0-20190325153808-1166b9ac2b65
h1:rQ229MBgvW68s1/g6f1/63TgYwYxfF4E+bi/KC19P8g=
+github.com/tidwall/pretty v0.0.0-20190325153808-1166b9ac2b65/go.mod
h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33
h1:I6FyU15t786LL7oL/hn43zqTuEGr4PN7F4XJ1p4E3Y8=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod
h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
diff --git a/kernel/client.go b/kernel/client.go
index ae258e4..2736d6a 100644
--- a/kernel/client.go
+++ b/kernel/client.go
@@ -52,8 +52,19 @@ const (
var (
ErrServiceState = errors.New("service state is not running, please
check")
+
+ _VIPChannelEnable = false
)
+func init() {
+ if os.Getenv("com.rocketmq.sendMessageWithVIPChannel") != "" {
+ value, err :=
strconv.ParseBool(os.Getenv("com.rocketmq.sendMessageWithVIPChannel"))
+ if err == nil {
+ _VIPChannelEnable = value
+ }
+ }
+}
+
type ClientOption struct {
NameServerAddr string
ClientIP string
@@ -71,8 +82,8 @@ func (opt *ClientOption) ChangeInstanceNameToPID() {
}
func (opt *ClientOption) String() string {
- return fmt.Sprintf("ClientOption [NameServerAddr=%s, ClientIP=%s,
InstanceName=%s, "+
- "UnitMode=%v, UnitName=%s, VIPChannelEnabled=%v, UseTLS=%v]",
opt.NameServerAddr, opt.ClientIP,
+ return fmt.Sprintf("ClientOption [ClientIP=%s, InstanceName=%s, "+
+ "UnitMode=%v, UnitName=%s, VIPChannelEnabled=%v, UseTLS=%v]",
opt.ClientIP,
opt.InstanceName, opt.UnitMode, opt.UnitName,
opt.VIPChannelEnabled, opt.UseTLS)
}
@@ -80,9 +91,8 @@ type InnerProducer interface {
PublishTopicList() []string
UpdateTopicPublishInfo(topic string, info *TopicPublishInfo)
IsPublishTopicNeedUpdate(topic string) bool
- GetCheckListener() func(msg *MessageExt)
- GetTransactionListener() TransactionListener
- isUnitMode() bool
+ //GetTransactionListener() TransactionListener
+ IsUnitMode() bool
}
type InnerConsumer interface {
@@ -104,6 +114,7 @@ type RMQClient struct {
once sync.Once
remoteClient *remote.RemotingClient
+ hbMutex sync.Mutex
}
var clientMap sync.Map
@@ -193,6 +204,8 @@ func (c *RMQClient) CheckClientInBroker() {
// TODO
func (c *RMQClient) SendHeartbeatToAllBrokerWithLock() {
+ c.hbMutex.Lock()
+ defer c.hbMutex.Unlock()
hbData := &heartbeatData{
ClientId: c.ClientID(),
}
@@ -279,27 +292,6 @@ func (c *RMQClient) UpdateTopicRouteInfo() {
}
}
-// SendMessage with batch by sync
-func (c *RMQClient) SendMessageSync(ctx context.Context, brokerAddrs,
brokerName string, request *SendMessageRequest,
- msgs []*Message) (*SendResult, error) {
- cmd := getRemotingCommand(request, msgs)
- response, err := c.remoteClient.InvokeSync(brokerAddrs, cmd,
3*time.Second)
- if err != nil {
- rlog.Warnf("send messages with sync error: %v", err)
- return nil, err
- }
-
- return c.processSendResponse(brokerName, msgs, response), nil
-}
-
-func getRemotingCommand(request *SendMessageRequest, msgs []*Message)
*remote.RemotingCommand {
- if request.Batch {
- return remote.NewRemotingCommand(ReqSendBatchMessage, request,
encodeMessages(msgs))
- } else {
- return remote.NewRemotingCommand(ReqSendMessage, request,
encodeMessages(msgs))
- }
-}
-
// SendMessageAsync send message with batch by async
func (c *RMQClient) SendMessageAsync(ctx context.Context, brokerAddrs,
brokerName string, request *SendMessageRequest,
msgs []*Message, f func(result *SendResult)) error {
@@ -316,7 +308,7 @@ func (c *RMQClient) SendMessageOneWay(ctx context.Context,
brokerAddrs string, r
return nil, err
}
-func (c *RMQClient) processSendResponse(brokerName string, msgs []*Message,
cmd *remote.RemotingCommand) *SendResult {
+func (c *RMQClient) ProcessSendResponse(brokerName string, cmd
*remote.RemotingCommand, msgs ...*Message) *SendResult {
var status SendStatus
switch cmd.Code {
case ResFlushDiskTimeout:
@@ -331,9 +323,6 @@ func (c *RMQClient) processSendResponse(brokerName string,
msgs []*Message, cmd
// TODO process unknown code
}
- sendResponse := &SendMessageResponse{}
- sendResponse.Decode(cmd.ExtFields)
-
msgIDs := make([]string, 0)
for i := 0; i < len(msgs); i++ {
msgIDs = append(msgIDs,
msgs[i].Properties[PropertyUniqueClientMessageIdKeyIndex])
@@ -346,19 +335,21 @@ func (c *RMQClient) processSendResponse(brokerName
string, msgs []*Message, cmd
regionId = defaultTraceRegionID
}
+ qId, _ := strconv.Atoi(cmd.ExtFields["queueId"])
+ off, _ := strconv.ParseInt(cmd.ExtFields["queueOffset"], 10, 64)
return &SendResult{
Status: status,
- MsgIDs: msgIDs,
- OffsetMsgID: sendResponse.MsgId,
+ MsgID: cmd.ExtFields["msgId"],
+ OffsetMsgID: cmd.ExtFields["msgId"],
MessageQueue: &MessageQueue{
Topic: msgs[0].Topic,
BrokerName: brokerName,
- QueueId: int(sendResponse.QueueId),
+ QueueId: qId,
},
- QueueOffset: sendResponse.QueueOffset,
- TransactionID: sendResponse.TransactionId,
- RegionID: regionId,
- TraceOn: trace != "" && trace != _TranceOff,
+ QueueOffset: off,
+ //TransactionID: sendResponse.TransactionId,
+ RegionID: regionId,
+ TraceOn: trace != "" && trace != _TranceOff,
}
}
@@ -427,6 +418,7 @@ func (c *RMQClient) UnregisterConsumer(group string) {
}
func (c *RMQClient) RegisterProducer(group string, producer InnerProducer) {
+ c.producerMap.Store(group, producer)
}
func (c *RMQClient) UnregisterProducer(group string) {
@@ -453,10 +445,10 @@ func (c *RMQClient) UpdatePublishInfo(topic string, data
*TopicRouteData) {
return
}
c.producerMap.Range(func(key, value interface{}) bool {
- consumer := value.(InnerProducer)
+ p := value.(InnerProducer)
publishInfo := routeData2PublishInfo(topic, data)
publishInfo.HaveTopicRouterInfo = true
- consumer.UpdateTopicPublishInfo(topic, publishInfo)
+ p.UpdateTopicPublishInfo(topic, publishInfo)
return true
})
}
@@ -464,8 +456,8 @@ func (c *RMQClient) UpdatePublishInfo(topic string, data
*TopicRouteData) {
func (c *RMQClient) isNeedUpdatePublishInfo(topic string) bool {
var result bool
c.producerMap.Range(func(key, value interface{}) bool {
- consumer := value.(InnerProducer)
- if consumer.IsPublishTopicNeedUpdate(topic) {
+ p := value.(InnerProducer)
+ if p.IsPublishTopicNeedUpdate(topic) {
result = true
return false
}
@@ -519,8 +511,24 @@ func routeData2SubscribeInfo(topic string, data
*TopicRouteData) []*MessageQueue
func encodeMessages(message []*Message) []byte {
var buffer bytes.Buffer
index := 0
- for index < len(message){
+ for index < len(message) {
buffer.Write(message[index].Body)
}
return buffer.Bytes()
}
+
+func brokerVIPChannel(brokerAddr string) string {
+ if !_VIPChannelEnable {
+ return brokerAddr
+ }
+ var brokerAddrNew strings.Builder
+ ipAndPort := strings.Split(brokerAddr, ":")
+ port, err := strconv.Atoi(ipAndPort[1])
+ if err != nil {
+ return ""
+ }
+ brokerAddrNew.WriteString(ipAndPort[0])
+ brokerAddrNew.WriteString(":")
+ brokerAddrNew.WriteString(strconv.Itoa(port - 2))
+ return brokerAddrNew.String()
+}
diff --git a/kernel/model.go b/kernel/model.go
index 0ddc4c3..8ed0f97 100644
--- a/kernel/model.go
+++ b/kernel/model.go
@@ -42,7 +42,7 @@ const (
// SendResult RocketMQ send result
type SendResult struct {
Status SendStatus
- MsgIDs []string
+ MsgID string
MessageQueue *MessageQueue
QueueOffset int64
TransactionID string
@@ -54,7 +54,7 @@ type SendResult struct {
// SendResult send message result to string(detail result)
func (result *SendResult) String() string {
return fmt.Sprintf("SendResult [sendStatus=%d, msgIds=%s,
offsetMsgId=%s, queueOffset=%d, messageQueue=%s]",
- result.Status, result.MsgIDs, result.OffsetMsgID,
result.QueueOffset, result.MessageQueue.String())
+ result.Status, result.MsgID, result.OffsetMsgID,
result.QueueOffset, result.MessageQueue.String())
}
// PullStatus pull status
diff --git a/kernel/request.go b/kernel/request.go
index 9f16e44..5fe1a44 100644
--- a/kernel/request.go
+++ b/kernel/request.go
@@ -44,23 +44,34 @@ const (
)
type SendMessageRequest struct {
- ProducerGroup string `json:"producerGroup"`
- Topic string `json:"topic"`
- DefaultTopic string `json:"defaultTopic"`
- DefaultTopicQueueNums int `json:"defaultTopicQueueNums"`
- QueueId int32 `json:"queueId"`
- SysFlag int `json:"sysFlag"`
- BornTimestamp int64 `json:"bornTimestamp"`
- Flag int `json:"flag"`
- Properties string `json:"properties"`
- ReconsumeTimes int `json:"reconsumeTimes"`
- UnitMode bool `json:"unitMode"`
- MaxReconsumeTimes int `json:"maxReconsumeTimes"`
- Batch bool `json:"batch"`
+ ProducerGroup string `json:"producerGroup"`
+ Topic string `json:"topic"`
+ QueueId int `json:"queueId"`
+ SysFlag int `json:"sysFlag"`
+ BornTimestamp int64 `json:"bornTimestamp"`
+ Flag int32 `json:"flag"`
+ Properties string `json:"properties"`
+ ReconsumeTimes int `json:"reconsumeTimes"`
+ UnitMode bool `json:"unitMode"`
+ MaxReconsumeTimes int `json:"maxReconsumeTimes"`
+ Batch bool
}
func (request *SendMessageRequest) Encode() map[string]string {
- return nil
+ maps := make(map[string]string)
+ maps["producerGroup"] = request.ProducerGroup
+ maps["topic"] = request.Topic
+ maps["queueId"] = strconv.Itoa(request.QueueId)
+ maps["sysFlag"] = fmt.Sprintf("%d", request.SysFlag)
+ maps["bornTimestamp"] = strconv.FormatInt(request.BornTimestamp, 10)
+ maps["flag"] = fmt.Sprintf("%d", request.Flag)
+ maps["reconsumeTimes"] = strconv.Itoa(request.ReconsumeTimes)
+ maps["unitMode"] = strconv.FormatBool(request.UnitMode)
+ maps["maxReconsumeTimes"] = strconv.Itoa(request.MaxReconsumeTimes)
+ maps["defaultTopic"] = "TBW102"
+ maps["defaultTopicQueueNums"] = "4"
+ maps["batch"] = strconv.FormatBool(request.Batch)
+ return maps
}
func (request *SendMessageRequest) Decode(properties map[string]string) error {
diff --git a/kernel/route.go b/kernel/route.go
index a1bfe3a..d0678e6 100644
--- a/kernel/route.go
+++ b/kernel/route.go
@@ -25,6 +25,7 @@ import (
"github.com/apache/rocketmq-client-go/utils"
"github.com/tidwall/gjson"
"math/rand"
+ "os"
"sort"
"strconv"
"strings"
@@ -34,6 +35,8 @@ import (
)
const (
+ EnvNameServerAddr = "NAMESRV_ADDR"
+
requestTimeout = 3 * time.Second
defaultTopic = "TBW102"
defaultQueueNums = 4
@@ -52,7 +55,6 @@ var (
// brokerName -> map[string]int32
brokerVersionMap sync.Map
- publishInfoMap sync.Map
//subscribeInfoMap sync.Map
routeDataMap sync.Map
lockNamesrv sync.Mutex
@@ -183,16 +185,6 @@ func FindBrokerAddrByName(brokerName string) string {
return bd.(*BrokerData).BrokerAddresses[MasterId]
}
-func FindTopicPublishInfo(topic string) *TopicPublishInfo {
- tpi, exist := publishInfoMap.Load(topic)
- if exist {
- if tpi.(*TopicPublishInfo).isOK() {
- return tpi.(*TopicPublishInfo)
- }
- }
- return nil
-}
-
func FindBrokerAddressInSubscribe(brokerName string, brokerId int64,
onlyThisBroker bool) *FindBrokerResult {
var (
brokerAddr = ""
@@ -378,7 +370,7 @@ func routeData2PublishInfo(topic string, data
*TopicRouteData) *TopicPublishInfo
}
func getNameServerAddress() string {
- return "127.0.0.1:9876"
+ return os.Getenv(EnvNameServerAddr)
}
// TopicRouteData TopicRouteData
diff --git a/producer.go b/producer.go
deleted file mode 100644
index 0bc55ea..0000000
--- a/producer.go
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one or more
-contributor license agreements. See the NOTICE file distributed with
-this work for additional information regarding copyright ownership.
-The ASF licenses this file to You under the Apache License, Version 2.0
-(the "License"); you may not use this file except in compliance with
-the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-package rocketmq
-
-import (
- "context"
- "encoding/json"
- "github.com/apache/rocketmq-client-go/common"
- "github.com/apache/rocketmq-client-go/kernel"
- "github.com/apache/rocketmq-client-go/rlog"
- "github.com/apache/rocketmq-client-go/utils"
- "os"
- "strconv"
- "strings"
- "sync/atomic"
- "time"
-)
-
-type Producer interface {
- Start()
- Send(msg *kernel.Message) (*kernel.SendResult, error)
-}
-
-type defaultProducer struct {
- client *kernel.RMQClient
- state kernel.ServiceState
- config ProducerConfig
-}
-
-type ProducerConfig struct {
- GroupName string
- CommunicationType int
- RetryTimesWhenSendFailed int
-}
-
-func (c *defaultProducer) Send(msg *kernel.Message) (*kernel.SendResult,
error) {
- result :=tryToFindTopicPublishInfo(msg.Topic)
- if result != nil && result.HaveTopicRouterInfo {
- if c.config.RetryTimesWhenSendFailed == 0 {
- c.config.RetryTimesWhenSendFailed = 2
- }
- retryTime := 1
- if c.config.CommunicationType == common.SYNC {
- retryTime = 1 + c.config.RetryTimesWhenSendFailed
- }
- for retryCount := 0; retryCount < retryTime; retryCount++ {
- messageQueue := getMessageQueue(result)
- if messageQueue == nil {
- continue
- }
-
- brokerAddress :=
kernel.FindBrokerAddrByName(messageQueue.BrokerName)
- if brokerAddress == "" {
- break
- }
- brokerAddress = brokerVIPChannel(brokerAddress)
- sysFlag := 0
- tranMsg := msg.Properties["TRAN_MSG"]
- if tranMsg != "" {
- tranMsgBool, err :=strconv.ParseBool(tranMsg)
- if err == nil && tranMsgBool {
- sysFlag |=
utils.TRANSACTION_PREPARED_TYPE;
- }
- sendMsg, err := getSendMessage(msg, c,
messageQueue, sysFlag)
- if err != nil {
- continue
- }
- return sendKernel(brokerAddress, c, msg,
sendMsg, messageQueue)
- }
- }
- }
- return nil, nil
-}
-
-func sendKernel(brokerAddress string, receive *defaultProducer, msg
*kernel.Message, sendMsg *kernel.SendMessageRequest, messageQueue
*kernel.MessageQueue) (*kernel.SendResult, error) {
- switch receive.config.CommunicationType{
- case common.SYNC:
- return receive.client.SendMessageSync(context.Background(),
brokerAddress, messageQueue.BrokerName, sendMsg, []*kernel.Message{msg})
- case common.ASYNC:
- //kernel.SendMessageAsync()
- return nil,nil
- case common.ONEWAY:
- return receive.client.SendMessageOneWay(context.Background(),
brokerAddress, sendMsg, []*kernel.Message{msg})
- default:
- return nil,nil
- }
-}
-
-func getSendMessage(msg *kernel.Message, receiver *defaultProducer,
messageQueue *kernel.MessageQueue, sysFlag int)
(*kernel.SendMessageRequest,error) {
-
- properties, err := json.Marshal(msg.Properties)
- maxReconsumeTimes, errtimes :=
strconv.Atoi(msg.Properties["MAX_RECONSUME_TIMES"])
- if errtimes != nil {
- maxReconsumeTimes = 1
- }
- if err == nil {
- sendMessageRequest := &kernel.SendMessageRequest{
- ProducerGroup: receiver.config.GroupName,
- Topic: msg.Topic,
- DefaultTopic: "TBW102",
- DefaultTopicQueueNums: 8,
- QueueId: int32(messageQueue.QueueId),
- SysFlag: sysFlag,
- BornTimestamp: time.Now().Unix(),
- Flag: int(msg.Flag),
- Properties: string(properties),
- ReconsumeTimes: 0,
- UnitMode: false,
- MaxReconsumeTimes: maxReconsumeTimes,
- Batch: msg.Batch,
- }
- return sendMessageRequest,nil
- }
- return nil,err
-}
-
-
-func brokerVIPChannel(brokerAddr string) string {
- var brokerAddrNew strings.Builder
- var isChange bool
- var err error
- if os.Getenv("com.rocketmq.sendMessageWithVIPChannel") != "" {
- isChange, err =
strconv.ParseBool(os.Getenv("com.rocketmq.sendMessageWithVIPChannel"))
- if err != nil {
- isChange = true
- }
- }else{
- isChange = true
- }
- if isChange {
- ipAndPort := strings.Split(brokerAddr, ":")
-
- port, err :=strconv.Atoi(ipAndPort[1])
-
- if err != nil {
- return ""
- }
- brokerAddrNew.WriteString(ipAndPort[0])
- brokerAddrNew.WriteString(":")
- brokerAddrNew.WriteString(strconv.Itoa(port-2))
- return brokerAddrNew.String();
- } else {
- return brokerAddr;
- }
-}
-
-
-func tryToFindTopicPublishInfo(topic string) *kernel.TopicPublishInfo {
- result := kernel.FindTopicPublishInfo(topic)
-
- if result == nil {
- kernel.UpdateTopicRouteInfo(topic)
- }
- return kernel.FindTopicPublishInfo(topic)
-}
-
-func getMessageQueue(tpInfo *kernel.TopicPublishInfo) *kernel.MessageQueue {
- if tpInfo.MqList != nil && len(tpInfo.MqList) <= 0 {
- rlog.Error("can not find proper message queue")
- return nil
- }
- return tpInfo.MqList[int(atomic.AddInt32(&tpInfo.TopicQueueIndex,
1))%len(tpInfo.MqList)]
-}
-
-func (c *defaultProducer) Start() {
- c.state = kernel.StateRunning
-}
-
-func NewProducer(config ProducerConfig) Producer {
- return &defaultProducer{
- config: config,
- }
-}
-
diff --git a/producer/producer.go b/producer/producer.go
new file mode 100644
index 0000000..4c58905
--- /dev/null
+++ b/producer/producer.go
@@ -0,0 +1,203 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package producer
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "github.com/apache/rocketmq-client-go/kernel"
+ "github.com/apache/rocketmq-client-go/remote"
+ "github.com/apache/rocketmq-client-go/rlog"
+ "os"
+ "sync"
+ "sync/atomic"
+ "time"
+)
+
+type Producer interface {
+ Start() error
+ Shutdown() error
+ SendSync(context.Context, *kernel.Message) (*kernel.SendResult, error)
+ //SendOneWay(context.Context, *kernel.Message) error
+}
+
+func NewProducer(opt ProducerOptions) Producer {
+ if opt.RetryTimesWhenSendFailed == 0 {
+ opt.RetryTimesWhenSendFailed = 2
+ }
+ if opt.NameServerAddr == "" {
+ rlog.Fatal("opt.NameServerAddr can't be empty")
+ }
+ err := os.Setenv(kernel.EnvNameServerAddr, opt.NameServerAddr)
+ if err != nil {
+ rlog.Fatal("set env=EnvNameServerAddr error: %s ", err.Error())
+ }
+ return &defaultProducer{
+ group: "default",
+ client: kernel.GetOrNewRocketMQClient(opt.ClientOption),
+ options: opt,
+ }
+}
+
+type defaultProducer struct {
+ group string
+ client *kernel.RMQClient
+ state kernel.ServiceState
+ options ProducerOptions
+ publishInfo sync.Map
+}
+
+type ProducerOptions struct {
+ kernel.ClientOption
+ NameServerAddr string
+ GroupName string
+ RetryTimesWhenSendFailed int
+ UnitMode bool
+}
+
+func (p *defaultProducer) Start() error {
+ p.state = kernel.StateRunning
+ p.client.RegisterProducer(p.group, p)
+ p.client.Start()
+ return nil
+}
+
+func (p *defaultProducer) Shutdown() error {
+ return nil
+}
+
+func (p *defaultProducer) SendSync(ctx context.Context, msg *kernel.Message)
(*kernel.SendResult, error) {
+ if msg == nil {
+ return nil, errors.New("message is nil")
+ }
+
+ if msg.Topic == "" {
+ return nil, errors.New("topic is nil")
+ }
+
+ retryTime := 1 + p.options.RetryTimesWhenSendFailed
+
+ var (
+ err error
+ )
+ for retryCount := 0; retryCount < retryTime; retryCount++ {
+ mq := p.selectMessageQueue(msg.Topic)
+ if mq == nil {
+ err = fmt.Errorf("the topic=%s route info not found",
msg.Topic)
+ continue
+ }
+
+ addr := kernel.FindBrokerAddrByName(mq.BrokerName)
+ if addr == "" {
+ return nil, fmt.Errorf("topic=%s route info not found",
mq.Topic)
+ }
+
+ res, _err := p.client.InvokeSync(addr, p.buildSendRequest(mq,
msg), 3*time.Second)
+ if _err != nil {
+ err = _err
+ continue
+ }
+ return p.client.ProcessSendResponse(mq.BrokerName, res, msg),
nil
+ }
+ return nil, err
+}
+
+func (p *defaultProducer) SendOneWay(context.Context, string,
...*kernel.Message) error {
+ return nil
+}
+
+func (p *defaultProducer) buildSendRequest(mq *kernel.MessageQueue, msg
*kernel.Message) *remote.RemotingCommand {
+ req := &kernel.SendMessageRequest{
+ ProducerGroup: p.group,
+ Topic: mq.Topic,
+ QueueId: mq.QueueId,
+ SysFlag: 0,
+ BornTimestamp: time.Now().UnixNano() / int64(time.Millisecond),
+ Flag: msg.Flag,
+ Properties: propertiesToString(msg.Properties),
+ ReconsumeTimes: 0,
+ UnitMode: p.options.UnitMode,
+ Batch: false,
+ }
+ return remote.NewRemotingCommand(kernel.ReqSendMessage, req, msg.Body)
+}
+
+func (p *defaultProducer) selectMessageQueue(topic string)
*kernel.MessageQueue {
+ v, exist := p.publishInfo.Load(topic)
+
+ if !exist {
+ p.client.UpdatePublishInfo(topic,
kernel.UpdateTopicRouteInfo(topic))
+ v, exist = p.publishInfo.Load(topic)
+ }
+
+ if !exist {
+ return nil
+ }
+
+ result := v.(*kernel.TopicPublishInfo)
+ if result == nil || !result.HaveTopicRouterInfo {
+ return nil
+ }
+
+ if result.MqList != nil && len(result.MqList) <= 0 {
+ rlog.Error("can not find proper message queue")
+ return nil
+ }
+ return result.MqList[int(atomic.AddInt32(&result.TopicQueueIndex,
1))%len(result.MqList)]
+}
+
+func (p *defaultProducer) PublishTopicList() []string {
+ topics := make([]string, 0)
+ p.publishInfo.Range(func(key, value interface{}) bool {
+ topics = append(topics, key.(string))
+ return true
+ })
+ return topics
+}
+
+func (p *defaultProducer) UpdateTopicPublishInfo(topic string, info
*kernel.TopicPublishInfo) {
+ if topic == "" || info == nil {
+ return
+ }
+ p.publishInfo.Store(topic, info)
+}
+
+func (p *defaultProducer) IsPublishTopicNeedUpdate(topic string) bool {
+ v, exist := p.publishInfo.Load(topic)
+ if !exist {
+ return true
+ }
+ info := v.(*kernel.TopicPublishInfo)
+ return info.MqList == nil || len(info.MqList) == 0
+}
+
+func (p *defaultProducer) IsUnitMode() bool {
+ return false
+}
+
+func propertiesToString(properties map[string]string) string {
+ if properties == nil {
+ return ""
+ }
+ var str string
+ for k, v := range properties {
+ str += fmt.Sprintf("%s%v%s%v", k, byte(1), v, byte(2))
+ }
+ return str
+}
diff --git a/producer_test.go b/producer_test.go
deleted file mode 100644
index 9fa466e..0000000
--- a/producer_test.go
+++ /dev/null
@@ -1,12 +0,0 @@
-package rocketmq
-
-import (
- "testing"
-)
-
-func TestProducer_Send(t *testing.T) {
- producer := NewProducer(ProducerConfig{
- GroupName: "testGroup",
- })
- producer.Start()
-}
diff --git a/utils/messagesysflag.go b/utils/fun.go
similarity index 61%
copy from utils/messagesysflag.go
copy to utils/fun.go
index c113be1..4e9e4ea 100644
--- a/utils/messagesysflag.go
+++ b/utils/fun.go
@@ -16,31 +16,3 @@ limitations under the License.
*/
package utils
-
-
-
-var(
- COMPRESSED_FLAG = 0x1
-
- MULTI_TAGS_FLAG = 0x1 << 1
-
- TRANSACTION_NOT_TYPE = 0
-
- TRANSACTION_PREPARED_TYPE = 0x1 << 2
-
- TRANSACTION_COMMIT_TYPE = 0x2 << 2
-
- TRANSACTION_ROLLBACK_TYPE = 0x3 << 2
-)
-
-func GetTransactionValue(flag int) int {
- return flag & TRANSACTION_ROLLBACK_TYPE
-}
-
-func ResetTransactionValue(flag int, typeFlag int) int {
- return (flag & (^TRANSACTION_ROLLBACK_TYPE)) | typeFlag
-}
-
-func ClearCompressedFlag(flag int) int {
- return flag & (^COMPRESSED_FLAG)
-}
\ No newline at end of file
diff --git a/utils/messagesysflag.go b/utils/messagesysflag.go
index c113be1..589490d 100644
--- a/utils/messagesysflag.go
+++ b/utils/messagesysflag.go
@@ -17,9 +17,7 @@ limitations under the License.
package utils
-
-
-var(
+var (
COMPRESSED_FLAG = 0x1
MULTI_TAGS_FLAG = 0x1 << 1
@@ -43,4 +41,4 @@ func ResetTransactionValue(flag int, typeFlag int) int {
func ClearCompressedFlag(flag int) int {
return flag & (^COMPRESSED_FLAG)
-}
\ No newline at end of file
+}