This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/native by this push:
     new f14b760  Refactoring Producer (#58)
f14b760 is described below

commit f14b7607bf1ef1fcc8cd5f9fb97418b1cbd8c95d
Author: wenfeng <[email protected]>
AuthorDate: Fri May 10 13:51:56 2019 +0800

    Refactoring Producer (#58)
    
    * refactor producer
    
    * fix msg decode bug
    
    * make name srv configurable
---
 common/CommonConstant.go                |   2 +-
 consumer/consumer.go                    |   2 +
 consumer/push_consumer.go               |  22 +++-
 examples/consumer/main.go               |  16 +--
 examples/{consumer => producer}/main.go |  45 +++----
 go.mod                                  |  13 +-
 go.sum                                  |  16 +--
 kernel/client.go                        |  92 ++++++++-------
 kernel/model.go                         |   4 +-
 kernel/request.go                       |  39 +++---
 kernel/route.go                         |  16 +--
 producer.go                             | 189 -----------------------------
 producer/producer.go                    | 203 ++++++++++++++++++++++++++++++++
 producer_test.go                        |  12 --
 utils/{messagesysflag.go => fun.go}     |  28 -----
 utils/messagesysflag.go                 |   6 +-
 16 files changed, 344 insertions(+), 361 deletions(-)

diff --git a/common/CommonConstant.go b/common/CommonConstant.go
index bd4e7ce..754114c 100644
--- a/common/CommonConstant.go
+++ b/common/CommonConstant.go
@@ -5,4 +5,4 @@ const (
        SYNC
        ASYNC
        ONEWAY
-)
\ No newline at end of file
+)
diff --git a/consumer/consumer.go b/consumer/consumer.go
index a6d9500..91da8cf 100644
--- a/consumer/consumer.go
+++ b/consumer/consumer.go
@@ -194,6 +194,8 @@ func (pr *PullRequest) String() string {
 
 type ConsumerOption struct {
        kernel.ClientOption
+       NameServerAddr string
+
        /**
         * Backtracking consumption time with second precision. Time format is
         * 20131223171201<br>
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index 2039be1..abc087e 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -24,6 +24,7 @@ import (
        "github.com/apache/rocketmq-client-go/rlog"
        "github.com/apache/rocketmq-client-go/utils"
        "math"
+       "os"
        "strconv"
        "time"
 )
@@ -63,6 +64,13 @@ type pushConsumer struct {
 func NewPushConsumer(consumerGroup string, opt ConsumerOption) PushConsumer {
        opt.InstanceName = "DEFAULT"
        opt.ClientIP = utils.LocalIP()
+       if opt.NameServerAddr == "" {
+               rlog.Fatal("opt.NameServerAddr can't be empty")
+       }
+       err := os.Setenv(kernel.EnvNameServerAddr, opt.NameServerAddr)
+       if err != nil {
+               rlog.Fatal("set env=EnvNameServerAddr error: %s ", err.Error())
+       }
        dc := &defaultConsumer{
                consumerGroup:  consumerGroup,
                cType:          _PushConsume,
@@ -451,7 +459,7 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) {
                                pq.putMessage(msgFounded...)
                        }
                        if result.NextBeginOffset < prevRequestOffset || 
firstMsgOffset < prevRequestOffset {
-                               rlog.Warnf("[BUG] pull message result maybe 
data wrong, [nextBeginOffset=%s, "+
+                               rlog.Warnf("[BUG] pull message result maybe 
data wrong, [nextBeginOffset=%d, "+
                                        "firstMsgOffset=%d, 
prevRequestOffset=%d]", result.NextBeginOffset, firstMsgOffset, 
prevRequestOffset)
                        }
                case kernel.PullNoNewMsg:
@@ -607,12 +615,14 @@ func (pc *pushConsumer) consumeMessageCurrently(pq 
*processQueue, mq *kernel.Mes
                        groupTopic := kernel.RetryGroupTopicPrefix + 
pc.consumerGroup
                        for idx := range subMsgs {
                                msg := subMsgs[idx]
-                               retryTopic := 
msg.Properties[kernel.PropertyRetryTopic]
-                               if retryTopic == "" && groupTopic == msg.Topic {
-                                       msg.Topic = retryTopic
+                               if msg.Properties != nil {
+                                       retryTopic := 
msg.Properties[kernel.PropertyRetryTopic]
+                                       if retryTopic == "" && groupTopic == 
msg.Topic {
+                                               msg.Topic = retryTopic
+                                       }
+                                       
subMsgs[idx].Properties[kernel.PropertyConsumeStartTime] = strconv.FormatInt(
+                                               
beginTime.UnixNano()/int64(time.Millisecond), 10)
                                }
-                               
subMsgs[idx].Properties[kernel.PropertyConsumeStartTime] = strconv.FormatInt(
-                                       
beginTime.UnixNano()/int64(time.Millisecond), 10)
                        }
                        result, err := pc.consume(ctx, subMsgs)
                        consumeRT := time.Now().Sub(beginTime)
diff --git a/examples/consumer/main.go b/examples/consumer/main.go
index 2b563ff..38e780e 100644
--- a/examples/consumer/main.go
+++ b/examples/consumer/main.go
@@ -22,22 +22,22 @@ import (
        "github.com/apache/rocketmq-client-go/consumer"
        "github.com/apache/rocketmq-client-go/kernel"
        "os"
-       "sync/atomic"
        "time"
 )
 
 func main() {
        c := consumer.NewPushConsumer("testGroup", consumer.ConsumerOption{
-               ConsumerModel: consumer.Clustering,
-               FromWhere:     consumer.ConsumeFromFirstOffset,
+               NameServerAddr: "127.0.0.1:9876",
+               ConsumerModel:  consumer.Clustering,
+               FromWhere:      consumer.ConsumeFromFirstOffset,
        })
-       var count int64
+       //var count int64
        err := c.Subscribe("test", consumer.MessageSelector{}, func(ctx 
*consumer.ConsumeMessageContext,
                msgs []*kernel.MessageExt) (consumer.ConsumeResult, error) {
-               c := atomic.AddInt64(&count, int64(len(msgs)))
-               if c%1000 == 0 {
-                       fmt.Println(c)
-               }
+               //c := atomic.AddInt64(&count, int64(len(msgs)))
+               //if c%1000 == 0 {
+               fmt.Println(msgs)
+               //}
                return consumer.ConsumeSuccess, nil
        })
        if err != nil {
diff --git a/examples/consumer/main.go b/examples/producer/main.go
similarity index 57%
copy from examples/consumer/main.go
copy to examples/producer/main.go
index 2b563ff..83bd127 100644
--- a/examples/consumer/main.go
+++ b/examples/producer/main.go
@@ -18,35 +18,38 @@ limitations under the License.
 package main
 
 import (
+       "context"
        "fmt"
-       "github.com/apache/rocketmq-client-go/consumer"
        "github.com/apache/rocketmq-client-go/kernel"
+       "github.com/apache/rocketmq-client-go/producer"
        "os"
-       "sync/atomic"
-       "time"
 )
 
 func main() {
-       c := consumer.NewPushConsumer("testGroup", consumer.ConsumerOption{
-               ConsumerModel: consumer.Clustering,
-               FromWhere:     consumer.ConsumeFromFirstOffset,
-       })
-       var count int64
-       err := c.Subscribe("test", consumer.MessageSelector{}, func(ctx 
*consumer.ConsumeMessageContext,
-               msgs []*kernel.MessageExt) (consumer.ConsumeResult, error) {
-               c := atomic.AddInt64(&count, int64(len(msgs)))
-               if c%1000 == 0 {
-                       fmt.Println(c)
-               }
-               return consumer.ConsumeSuccess, nil
-       })
+       opt := producer.ProducerOptions{
+               NameServerAddr:           "127.0.0.1:9876",
+               RetryTimesWhenSendFailed: 2,
+       }
+       p := producer.NewProducer(opt)
+       err := p.Start()
        if err != nil {
-               fmt.Println(err.Error())
+               fmt.Printf("start producer error: %s", err.Error())
+               os.Exit(1)
+       }
+       for i := 0; i < 1000; i++ {
+               res, err := p.SendSync(context.Background(), &kernel.Message{
+                       Topic: "test",
+                       Body:  []byte("Hello RocketMQ Go Client!"),
+               })
+
+               if err != nil {
+                       fmt.Printf("send message error: %s\n", err)
+               } else {
+                       fmt.Printf("send message success: result=%s\n", 
res.String())
+               }
        }
-       err = c.Start()
+       err = p.Shutdown()
        if err != nil {
-               fmt.Println(err.Error())
-               os.Exit(-1)
+               fmt.Printf("shundown producer error: %s", err.Error())
        }
-       time.Sleep(time.Hour)
 }
diff --git a/go.mod b/go.mod
index 419d8ba..f8a9967 100644
--- a/go.mod
+++ b/go.mod
@@ -1,19 +1,12 @@
 module github.com/apache/rocketmq-client-go
 
-go 1.11
+go 1.12
 
 require (
        github.com/emirpasic/gods v1.12.0
-       github.com/sirupsen/logrus v1.3.0
+       github.com/sirupsen/logrus v1.4.1
        github.com/stretchr/testify v1.3.0
        github.com/tidwall/gjson v1.2.1
        github.com/tidwall/match v1.0.1 // indirect
-       github.com/tidwall/pretty v0.0.0-20180105212114-65a9db5fad51 // indirect
-)
-
-replace (
-       golang.org/x/crypto v0.0.0-20180904163835-0709b304e793 => 
github.com/golang/crypto v0.0.0-20180904163835-0709b304e793
-       golang.org/x/net v0.0.0-20180821023952-922f4815f713 => 
github.com/golang/net v0.0.0-20180826012351-8a410e7b638d
-       golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33 => 
github.com/golang/sys v0.0.0-20180905080454-ebe1bf3edb33
-       golang.org/x/text v0.3.0 => github.com/golang/text v0.3.0
+       github.com/tidwall/pretty v0.0.0-20190325153808-1166b9ac2b65 // indirect
 )
diff --git a/go.sum b/go.sum
index 61c5e19..07969a4 100644
--- a/go.sum
+++ b/go.sum
@@ -1,28 +1,20 @@
 github.com/davecgh/go-spew v1.1.0/go.mod 
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
-github.com/davecgh/go-spew v1.1.1 
h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
 github.com/davecgh/go-spew v1.1.1/go.mod 
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
-github.com/golang/crypto v0.0.0-20180904163835-0709b304e793/go.mod 
h1:uZvAcrsnNaCxlh1HorK5dUQHGmEKPh2H/Rl1kehswPo=
-github.com/golang/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod 
h1:5JyrLPvD/ZdaYkT7IqKhsP5xt7aLjA99KXRtk4EIYDk=
 github.com/emirpasic/gods v1.12.0 
h1:QAUIPSaCu4G+POclxeqb3F+WPpdKqFGlw36+yOzGlrg=
 github.com/emirpasic/gods v1.12.0/go.mod 
h1:YfzfFFoVP/catgzJb4IKIqXjX78Ha8FMSDh3ymbK86o=
-github.com/konsorten/go-windows-terminal-sequences v1.0.1 
h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk=
 github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod 
h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
-github.com/pmezard/go-difflib v1.0.0 
h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
 github.com/pmezard/go-difflib v1.0.0/go.mod 
h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
-github.com/sirupsen/logrus v1.3.0 
h1:hI/7Q+DtNZ2kINb6qt/lS+IyXnHQe9e90POfeewL/ME=
-github.com/sirupsen/logrus v1.3.0/go.mod 
h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
+github.com/sirupsen/logrus v1.4.1 
h1:GL2rEmy6nsikmW0r8opw9JIRScdMF5hA8cOYLH7In1k=
+github.com/sirupsen/logrus v1.4.1/go.mod 
h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q=
 github.com/stretchr/objx v0.1.0/go.mod 
h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
 github.com/stretchr/objx v0.1.1/go.mod 
h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
 github.com/stretchr/testify v1.2.2/go.mod 
h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
-github.com/stretchr/testify v1.3.0 
h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
 github.com/stretchr/testify v1.3.0/go.mod 
h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
 github.com/tidwall/gjson v1.2.1 h1:j0efZLrZUvNerEf6xqoi0NjWMK5YlLrR7Guo/dxY174=
 github.com/tidwall/gjson v1.2.1/go.mod 
h1:c/nTNbUr0E0OrXEhq1pwa8iEgc2DOt4ZZqAt1HtCkPA=
 github.com/tidwall/match v1.0.1 h1:PnKP62LPNxHKTwvHHZZzdOAOCtsJTjo6dZLCwpKm5xc=
 github.com/tidwall/match v1.0.1/go.mod 
h1:LujAq0jyVjBy028G1WhWfIzbpQfMO8bBZ6Tyb0+pL9E=
-github.com/tidwall/pretty v0.0.0-20180105212114-65a9db5fad51 
h1:BP2bjP495BBPaBcS5rmqviTfrOkN5rO5ceKAMRZCRFc=
-github.com/tidwall/pretty v0.0.0-20180105212114-65a9db5fad51/go.mod 
h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
-golang.org/x/crypto v0.0.0-20180904163835-0709b304e793 
h1:u+LnwYTOOW7Ukr/fppxEb1Nwz0AtPflrblfvUudpo+I=
-golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod 
h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
+github.com/tidwall/pretty v0.0.0-20190325153808-1166b9ac2b65 
h1:rQ229MBgvW68s1/g6f1/63TgYwYxfF4E+bi/KC19P8g=
+github.com/tidwall/pretty v0.0.0-20190325153808-1166b9ac2b65/go.mod 
h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
 golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33 
h1:I6FyU15t786LL7oL/hn43zqTuEGr4PN7F4XJ1p4E3Y8=
 golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod 
h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
diff --git a/kernel/client.go b/kernel/client.go
index ae258e4..2736d6a 100644
--- a/kernel/client.go
+++ b/kernel/client.go
@@ -52,8 +52,19 @@ const (
 
 var (
        ErrServiceState = errors.New("service state is not running, please 
check")
+
+       _VIPChannelEnable = false
 )
 
+func init() {
+       if os.Getenv("com.rocketmq.sendMessageWithVIPChannel") != "" {
+               value, err := 
strconv.ParseBool(os.Getenv("com.rocketmq.sendMessageWithVIPChannel"))
+               if err == nil {
+                       _VIPChannelEnable = value
+               }
+       }
+}
+
 type ClientOption struct {
        NameServerAddr    string
        ClientIP          string
@@ -71,8 +82,8 @@ func (opt *ClientOption) ChangeInstanceNameToPID() {
 }
 
 func (opt *ClientOption) String() string {
-       return fmt.Sprintf("ClientOption [NameServerAddr=%s, ClientIP=%s, 
InstanceName=%s, "+
-               "UnitMode=%v, UnitName=%s, VIPChannelEnabled=%v, UseTLS=%v]", 
opt.NameServerAddr, opt.ClientIP,
+       return fmt.Sprintf("ClientOption [ClientIP=%s, InstanceName=%s, "+
+               "UnitMode=%v, UnitName=%s, VIPChannelEnabled=%v, UseTLS=%v]", 
opt.ClientIP,
                opt.InstanceName, opt.UnitMode, opt.UnitName, 
opt.VIPChannelEnabled, opt.UseTLS)
 }
 
@@ -80,9 +91,8 @@ type InnerProducer interface {
        PublishTopicList() []string
        UpdateTopicPublishInfo(topic string, info *TopicPublishInfo)
        IsPublishTopicNeedUpdate(topic string) bool
-       GetCheckListener() func(msg *MessageExt)
-       GetTransactionListener() TransactionListener
-       isUnitMode() bool
+       //GetTransactionListener() TransactionListener
+       IsUnitMode() bool
 }
 
 type InnerConsumer interface {
@@ -104,6 +114,7 @@ type RMQClient struct {
        once        sync.Once
 
        remoteClient *remote.RemotingClient
+       hbMutex      sync.Mutex
 }
 
 var clientMap sync.Map
@@ -193,6 +204,8 @@ func (c *RMQClient) CheckClientInBroker() {
 
 // TODO
 func (c *RMQClient) SendHeartbeatToAllBrokerWithLock() {
+       c.hbMutex.Lock()
+       defer c.hbMutex.Unlock()
        hbData := &heartbeatData{
                ClientId: c.ClientID(),
        }
@@ -279,27 +292,6 @@ func (c *RMQClient) UpdateTopicRouteInfo() {
        }
 }
 
-// SendMessage with batch by sync
-func (c *RMQClient) SendMessageSync(ctx context.Context, brokerAddrs, 
brokerName string, request *SendMessageRequest,
-       msgs []*Message) (*SendResult, error) {
-       cmd := getRemotingCommand(request, msgs)
-       response, err := c.remoteClient.InvokeSync(brokerAddrs, cmd, 
3*time.Second)
-       if err != nil {
-               rlog.Warnf("send messages with sync error: %v", err)
-               return nil, err
-       }
-
-       return c.processSendResponse(brokerName, msgs, response), nil
-}
-
-func getRemotingCommand(request *SendMessageRequest,  msgs []*Message) 
*remote.RemotingCommand {
-       if request.Batch {
-               return remote.NewRemotingCommand(ReqSendBatchMessage, request, 
encodeMessages(msgs))
-       } else {
-               return remote.NewRemotingCommand(ReqSendMessage, request, 
encodeMessages(msgs))
-       }
-}
-
 // SendMessageAsync send message with batch by async
 func (c *RMQClient) SendMessageAsync(ctx context.Context, brokerAddrs, 
brokerName string, request *SendMessageRequest,
        msgs []*Message, f func(result *SendResult)) error {
@@ -316,7 +308,7 @@ func (c *RMQClient) SendMessageOneWay(ctx context.Context, 
brokerAddrs string, r
        return nil, err
 }
 
-func (c *RMQClient) processSendResponse(brokerName string, msgs []*Message, 
cmd *remote.RemotingCommand) *SendResult {
+func (c *RMQClient) ProcessSendResponse(brokerName string, cmd 
*remote.RemotingCommand, msgs ...*Message) *SendResult {
        var status SendStatus
        switch cmd.Code {
        case ResFlushDiskTimeout:
@@ -331,9 +323,6 @@ func (c *RMQClient) processSendResponse(brokerName string, 
msgs []*Message, cmd
                // TODO process unknown code
        }
 
-       sendResponse := &SendMessageResponse{}
-       sendResponse.Decode(cmd.ExtFields)
-
        msgIDs := make([]string, 0)
        for i := 0; i < len(msgs); i++ {
                msgIDs = append(msgIDs, 
msgs[i].Properties[PropertyUniqueClientMessageIdKeyIndex])
@@ -346,19 +335,21 @@ func (c *RMQClient) processSendResponse(brokerName 
string, msgs []*Message, cmd
                regionId = defaultTraceRegionID
        }
 
+       qId, _ := strconv.Atoi(cmd.ExtFields["queueId"])
+       off, _ := strconv.ParseInt(cmd.ExtFields["queueOffset"], 10, 64)
        return &SendResult{
                Status:      status,
-               MsgIDs:      msgIDs,
-               OffsetMsgID: sendResponse.MsgId,
+               MsgID:       cmd.ExtFields["msgId"],
+               OffsetMsgID: cmd.ExtFields["msgId"],
                MessageQueue: &MessageQueue{
                        Topic:      msgs[0].Topic,
                        BrokerName: brokerName,
-                       QueueId:    int(sendResponse.QueueId),
+                       QueueId:    qId,
                },
-               QueueOffset:   sendResponse.QueueOffset,
-               TransactionID: sendResponse.TransactionId,
-               RegionID:      regionId,
-               TraceOn:       trace != "" && trace != _TranceOff,
+               QueueOffset: off,
+               //TransactionID: sendResponse.TransactionId,
+               RegionID: regionId,
+               TraceOn:  trace != "" && trace != _TranceOff,
        }
 }
 
@@ -427,6 +418,7 @@ func (c *RMQClient) UnregisterConsumer(group string) {
 }
 
 func (c *RMQClient) RegisterProducer(group string, producer InnerProducer) {
+       c.producerMap.Store(group, producer)
 }
 
 func (c *RMQClient) UnregisterProducer(group string) {
@@ -453,10 +445,10 @@ func (c *RMQClient) UpdatePublishInfo(topic string, data 
*TopicRouteData) {
                return
        }
        c.producerMap.Range(func(key, value interface{}) bool {
-               consumer := value.(InnerProducer)
+               p := value.(InnerProducer)
                publishInfo := routeData2PublishInfo(topic, data)
                publishInfo.HaveTopicRouterInfo = true
-               consumer.UpdateTopicPublishInfo(topic, publishInfo)
+               p.UpdateTopicPublishInfo(topic, publishInfo)
                return true
        })
 }
@@ -464,8 +456,8 @@ func (c *RMQClient) UpdatePublishInfo(topic string, data 
*TopicRouteData) {
 func (c *RMQClient) isNeedUpdatePublishInfo(topic string) bool {
        var result bool
        c.producerMap.Range(func(key, value interface{}) bool {
-               consumer := value.(InnerProducer)
-               if consumer.IsPublishTopicNeedUpdate(topic) {
+               p := value.(InnerProducer)
+               if p.IsPublishTopicNeedUpdate(topic) {
                        result = true
                        return false
                }
@@ -519,8 +511,24 @@ func routeData2SubscribeInfo(topic string, data 
*TopicRouteData) []*MessageQueue
 func encodeMessages(message []*Message) []byte {
        var buffer bytes.Buffer
        index := 0
-       for index < len(message){
+       for index < len(message) {
                buffer.Write(message[index].Body)
        }
        return buffer.Bytes()
 }
+
+func brokerVIPChannel(brokerAddr string) string {
+       if !_VIPChannelEnable {
+               return brokerAddr
+       }
+       var brokerAddrNew strings.Builder
+       ipAndPort := strings.Split(brokerAddr, ":")
+       port, err := strconv.Atoi(ipAndPort[1])
+       if err != nil {
+               return ""
+       }
+       brokerAddrNew.WriteString(ipAndPort[0])
+       brokerAddrNew.WriteString(":")
+       brokerAddrNew.WriteString(strconv.Itoa(port - 2))
+       return brokerAddrNew.String()
+}
diff --git a/kernel/model.go b/kernel/model.go
index 0ddc4c3..8ed0f97 100644
--- a/kernel/model.go
+++ b/kernel/model.go
@@ -42,7 +42,7 @@ const (
 // SendResult RocketMQ send result
 type SendResult struct {
        Status        SendStatus
-       MsgIDs        []string
+       MsgID         string
        MessageQueue  *MessageQueue
        QueueOffset   int64
        TransactionID string
@@ -54,7 +54,7 @@ type SendResult struct {
 // SendResult send message result to string(detail result)
 func (result *SendResult) String() string {
        return fmt.Sprintf("SendResult [sendStatus=%d, msgIds=%s, 
offsetMsgId=%s, queueOffset=%d, messageQueue=%s]",
-               result.Status, result.MsgIDs, result.OffsetMsgID, 
result.QueueOffset, result.MessageQueue.String())
+               result.Status, result.MsgID, result.OffsetMsgID, 
result.QueueOffset, result.MessageQueue.String())
 }
 
 // PullStatus pull status
diff --git a/kernel/request.go b/kernel/request.go
index 9f16e44..5fe1a44 100644
--- a/kernel/request.go
+++ b/kernel/request.go
@@ -44,23 +44,34 @@ const (
 )
 
 type SendMessageRequest struct {
-       ProducerGroup         string `json:"producerGroup"`
-       Topic                 string `json:"topic"`
-       DefaultTopic          string `json:"defaultTopic"`
-       DefaultTopicQueueNums int    `json:"defaultTopicQueueNums"`
-       QueueId               int32  `json:"queueId"`
-       SysFlag               int    `json:"sysFlag"`
-       BornTimestamp         int64  `json:"bornTimestamp"`
-       Flag                  int    `json:"flag"`
-       Properties            string `json:"properties"`
-       ReconsumeTimes        int    `json:"reconsumeTimes"`
-       UnitMode              bool   `json:"unitMode"`
-       MaxReconsumeTimes     int    `json:"maxReconsumeTimes"`
-       Batch                 bool   `json:"batch"`
+       ProducerGroup     string `json:"producerGroup"`
+       Topic             string `json:"topic"`
+       QueueId           int    `json:"queueId"`
+       SysFlag           int    `json:"sysFlag"`
+       BornTimestamp     int64  `json:"bornTimestamp"`
+       Flag              int32  `json:"flag"`
+       Properties        string `json:"properties"`
+       ReconsumeTimes    int    `json:"reconsumeTimes"`
+       UnitMode          bool   `json:"unitMode"`
+       MaxReconsumeTimes int    `json:"maxReconsumeTimes"`
+       Batch             bool
 }
 
 func (request *SendMessageRequest) Encode() map[string]string {
-       return nil
+       maps := make(map[string]string)
+       maps["producerGroup"] = request.ProducerGroup
+       maps["topic"] = request.Topic
+       maps["queueId"] = strconv.Itoa(request.QueueId)
+       maps["sysFlag"] = fmt.Sprintf("%d", request.SysFlag)
+       maps["bornTimestamp"] = strconv.FormatInt(request.BornTimestamp, 10)
+       maps["flag"] = fmt.Sprintf("%d", request.Flag)
+       maps["reconsumeTimes"] = strconv.Itoa(request.ReconsumeTimes)
+       maps["unitMode"] = strconv.FormatBool(request.UnitMode)
+       maps["maxReconsumeTimes"] = strconv.Itoa(request.MaxReconsumeTimes)
+       maps["defaultTopic"] = "TBW102"
+       maps["defaultTopicQueueNums"] = "4"
+       maps["batch"] = strconv.FormatBool(request.Batch)
+       return maps
 }
 
 func (request *SendMessageRequest) Decode(properties map[string]string) error {
diff --git a/kernel/route.go b/kernel/route.go
index a1bfe3a..d0678e6 100644
--- a/kernel/route.go
+++ b/kernel/route.go
@@ -25,6 +25,7 @@ import (
        "github.com/apache/rocketmq-client-go/utils"
        "github.com/tidwall/gjson"
        "math/rand"
+       "os"
        "sort"
        "strconv"
        "strings"
@@ -34,6 +35,8 @@ import (
 )
 
 const (
+       EnvNameServerAddr = "NAMESRV_ADDR"
+
        requestTimeout   = 3 * time.Second
        defaultTopic     = "TBW102"
        defaultQueueNums = 4
@@ -52,7 +55,6 @@ var (
        // brokerName -> map[string]int32
        brokerVersionMap sync.Map
 
-       publishInfoMap sync.Map
        //subscribeInfoMap sync.Map
        routeDataMap sync.Map
        lockNamesrv  sync.Mutex
@@ -183,16 +185,6 @@ func FindBrokerAddrByName(brokerName string) string {
        return bd.(*BrokerData).BrokerAddresses[MasterId]
 }
 
-func FindTopicPublishInfo(topic string) *TopicPublishInfo {
-       tpi, exist := publishInfoMap.Load(topic)
-       if exist {
-               if tpi.(*TopicPublishInfo).isOK() {
-                       return tpi.(*TopicPublishInfo)
-               }
-       }
-       return nil
-}
-
 func FindBrokerAddressInSubscribe(brokerName string, brokerId int64, 
onlyThisBroker bool) *FindBrokerResult {
        var (
                brokerAddr = ""
@@ -378,7 +370,7 @@ func routeData2PublishInfo(topic string, data 
*TopicRouteData) *TopicPublishInfo
 }
 
 func getNameServerAddress() string {
-       return "127.0.0.1:9876"
+       return os.Getenv(EnvNameServerAddr)
 }
 
 // TopicRouteData TopicRouteData
diff --git a/producer.go b/producer.go
deleted file mode 100644
index 0bc55ea..0000000
--- a/producer.go
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one or more
-contributor license agreements.  See the NOTICE file distributed with
-this work for additional information regarding copyright ownership.
-The ASF licenses this file to You under the Apache License, Version 2.0
-(the "License"); you may not use this file except in compliance with
-the License.  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-package rocketmq
-
-import (
-       "context"
-       "encoding/json"
-       "github.com/apache/rocketmq-client-go/common"
-       "github.com/apache/rocketmq-client-go/kernel"
-       "github.com/apache/rocketmq-client-go/rlog"
-       "github.com/apache/rocketmq-client-go/utils"
-       "os"
-       "strconv"
-       "strings"
-       "sync/atomic"
-       "time"
-)
-
-type Producer interface {
-       Start()
-       Send(msg *kernel.Message) (*kernel.SendResult, error)
-}
-
-type defaultProducer struct {
-       client    *kernel.RMQClient
-       state  kernel.ServiceState
-       config ProducerConfig
-}
-
-type ProducerConfig struct {
-       GroupName                  string
-       CommunicationType          int
-       RetryTimesWhenSendFailed   int
-}
-
-func (c *defaultProducer) Send(msg *kernel.Message) (*kernel.SendResult, 
error) {
-       result :=tryToFindTopicPublishInfo(msg.Topic)
-       if result != nil && result.HaveTopicRouterInfo {
-               if c.config.RetryTimesWhenSendFailed == 0 {
-                       c.config.RetryTimesWhenSendFailed = 2
-               }
-               retryTime := 1
-               if  c.config.CommunicationType == common.SYNC {
-                       retryTime = 1 + c.config.RetryTimesWhenSendFailed
-               }
-               for retryCount := 0;  retryCount < retryTime; retryCount++ {
-                       messageQueue := getMessageQueue(result)
-                       if messageQueue == nil {
-                               continue
-                       }
-
-                       brokerAddress := 
kernel.FindBrokerAddrByName(messageQueue.BrokerName)
-                       if brokerAddress == "" {
-                               break
-                       }
-                       brokerAddress = brokerVIPChannel(brokerAddress)
-                       sysFlag := 0
-                       tranMsg := msg.Properties["TRAN_MSG"]
-                       if tranMsg != "" {
-                               tranMsgBool, err :=strconv.ParseBool(tranMsg)
-                               if err == nil && tranMsgBool {
-                                       sysFlag |= 
utils.TRANSACTION_PREPARED_TYPE;
-                               }
-                               sendMsg, err := getSendMessage(msg, c, 
messageQueue, sysFlag)
-                               if err != nil {
-                                       continue
-                               }
-                               return sendKernel(brokerAddress, c, msg, 
sendMsg, messageQueue)
-                       }
-               }
-       }
-       return nil, nil
-}
-
-func sendKernel(brokerAddress string, receive *defaultProducer, msg 
*kernel.Message, sendMsg *kernel.SendMessageRequest, messageQueue 
*kernel.MessageQueue) (*kernel.SendResult, error) {
-       switch receive.config.CommunicationType{
-       case common.SYNC:
-               return receive.client.SendMessageSync(context.Background(), 
brokerAddress, messageQueue.BrokerName, sendMsg, []*kernel.Message{msg})
-       case common.ASYNC:
-               //kernel.SendMessageAsync()
-               return nil,nil
-       case common.ONEWAY:
-               return receive.client.SendMessageOneWay(context.Background(), 
brokerAddress, sendMsg, []*kernel.Message{msg})
-       default:
-               return nil,nil
-       }
-}
-
-func getSendMessage(msg *kernel.Message, receiver *defaultProducer, 
messageQueue *kernel.MessageQueue, sysFlag int) 
(*kernel.SendMessageRequest,error) {
-
-       properties, err := json.Marshal(msg.Properties)
-       maxReconsumeTimes, errtimes := 
strconv.Atoi(msg.Properties["MAX_RECONSUME_TIMES"])
-       if errtimes != nil {
-               maxReconsumeTimes = 1
-       }
-       if err == nil {
-               sendMessageRequest := &kernel.SendMessageRequest{
-                       ProducerGroup:         receiver.config.GroupName,
-                       Topic:                 msg.Topic,
-                       DefaultTopic:          "TBW102",
-                       DefaultTopicQueueNums: 8,
-                       QueueId:               int32(messageQueue.QueueId),
-                       SysFlag:                           sysFlag,
-                       BornTimestamp:         time.Now().Unix(),
-                       Flag:                              int(msg.Flag),
-                       Properties:            string(properties),
-                       ReconsumeTimes:            0,
-                       UnitMode:                          false,
-                       MaxReconsumeTimes:     maxReconsumeTimes,
-                       Batch:                 msg.Batch,
-               }
-               return sendMessageRequest,nil
-       }
-       return nil,err
-}
-
-
-func brokerVIPChannel(brokerAddr string) string {
-       var brokerAddrNew strings.Builder
-       var isChange bool
-       var err error
-       if os.Getenv("com.rocketmq.sendMessageWithVIPChannel") != "" {
-               isChange, err = 
strconv.ParseBool(os.Getenv("com.rocketmq.sendMessageWithVIPChannel"))
-               if err != nil {
-                       isChange = true
-               }
-       }else{
-               isChange = true
-       }
-       if isChange {
-       ipAndPort := strings.Split(brokerAddr, ":")
-
-       port, err :=strconv.Atoi(ipAndPort[1])
-
-       if err != nil {
-               return ""
-       }
-       brokerAddrNew.WriteString(ipAndPort[0])
-       brokerAddrNew.WriteString(":")
-       brokerAddrNew.WriteString(strconv.Itoa(port-2))
-       return brokerAddrNew.String();
-       } else {
-               return brokerAddr;
-       }
-}
-
-
-func tryToFindTopicPublishInfo(topic string) *kernel.TopicPublishInfo {
-       result := kernel.FindTopicPublishInfo(topic)
-
-       if result == nil {
-               kernel.UpdateTopicRouteInfo(topic)
-       }
-       return kernel.FindTopicPublishInfo(topic)
-}
-
-func getMessageQueue(tpInfo *kernel.TopicPublishInfo) *kernel.MessageQueue {
-       if tpInfo.MqList != nil && len(tpInfo.MqList) <= 0 {
-               rlog.Error("can not find proper message queue")
-               return nil
-       }
-       return tpInfo.MqList[int(atomic.AddInt32(&tpInfo.TopicQueueIndex, 
1))%len(tpInfo.MqList)]
-}
-
-func (c *defaultProducer) Start() {
-       c.state = kernel.StateRunning
-}
-
-func NewProducer(config ProducerConfig) Producer {
-       return &defaultProducer{
-               config: config,
-       }
-}
-
diff --git a/producer/producer.go b/producer/producer.go
new file mode 100644
index 0000000..4c58905
--- /dev/null
+++ b/producer/producer.go
@@ -0,0 +1,203 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package producer
+
+import (
+       "context"
+       "errors"
+       "fmt"
+       "github.com/apache/rocketmq-client-go/kernel"
+       "github.com/apache/rocketmq-client-go/remote"
+       "github.com/apache/rocketmq-client-go/rlog"
+       "os"
+       "sync"
+       "sync/atomic"
+       "time"
+)
+
+type Producer interface {
+       Start() error
+       Shutdown() error
+       SendSync(context.Context, *kernel.Message) (*kernel.SendResult, error)
+       //SendOneWay(context.Context, *kernel.Message) error
+}
+
+func NewProducer(opt ProducerOptions) Producer {
+       if opt.RetryTimesWhenSendFailed == 0 {
+               opt.RetryTimesWhenSendFailed = 2
+       }
+       if opt.NameServerAddr == "" {
+               rlog.Fatal("opt.NameServerAddr can't be empty")
+       }
+       err := os.Setenv(kernel.EnvNameServerAddr, opt.NameServerAddr)
+       if err != nil {
+               rlog.Fatal("set env=EnvNameServerAddr error: %s ", err.Error())
+       }
+       return &defaultProducer{
+               group:   "default",
+               client:  kernel.GetOrNewRocketMQClient(opt.ClientOption),
+               options: opt,
+       }
+}
+
+type defaultProducer struct {
+       group       string
+       client      *kernel.RMQClient
+       state       kernel.ServiceState
+       options     ProducerOptions
+       publishInfo sync.Map
+}
+
+type ProducerOptions struct {
+       kernel.ClientOption
+       NameServerAddr           string
+       GroupName                string
+       RetryTimesWhenSendFailed int
+       UnitMode                 bool
+}
+
+func (p *defaultProducer) Start() error {
+       p.state = kernel.StateRunning
+       p.client.RegisterProducer(p.group, p)
+       p.client.Start()
+       return nil
+}
+
+func (p *defaultProducer) Shutdown() error {
+       return nil
+}
+
+func (p *defaultProducer) SendSync(ctx context.Context, msg *kernel.Message) 
(*kernel.SendResult, error) {
+       if msg == nil {
+               return nil, errors.New("message is nil")
+       }
+
+       if msg.Topic == "" {
+               return nil, errors.New("topic is nil")
+       }
+
+       retryTime := 1 + p.options.RetryTimesWhenSendFailed
+
+       var (
+               err error
+       )
+       for retryCount := 0; retryCount < retryTime; retryCount++ {
+               mq := p.selectMessageQueue(msg.Topic)
+               if mq == nil {
+                       err = fmt.Errorf("the topic=%s route info not found", 
msg.Topic)
+                       continue
+               }
+
+               addr := kernel.FindBrokerAddrByName(mq.BrokerName)
+               if addr == "" {
+                       return nil, fmt.Errorf("topic=%s route info not found", 
mq.Topic)
+               }
+
+               res, _err := p.client.InvokeSync(addr, p.buildSendRequest(mq, 
msg), 3*time.Second)
+               if _err != nil {
+                       err = _err
+                       continue
+               }
+               return p.client.ProcessSendResponse(mq.BrokerName, res, msg), 
nil
+       }
+       return nil, err
+}
+
+func (p *defaultProducer) SendOneWay(context.Context, string, 
...*kernel.Message) error {
+       return nil
+}
+
+func (p *defaultProducer) buildSendRequest(mq *kernel.MessageQueue, msg 
*kernel.Message) *remote.RemotingCommand {
+       req := &kernel.SendMessageRequest{
+               ProducerGroup:  p.group,
+               Topic:          mq.Topic,
+               QueueId:        mq.QueueId,
+               SysFlag:        0,
+               BornTimestamp:  time.Now().UnixNano() / int64(time.Millisecond),
+               Flag:           msg.Flag,
+               Properties:     propertiesToString(msg.Properties),
+               ReconsumeTimes: 0,
+               UnitMode:       p.options.UnitMode,
+               Batch:          false,
+       }
+       return remote.NewRemotingCommand(kernel.ReqSendMessage, req, msg.Body)
+}
+
+func (p *defaultProducer) selectMessageQueue(topic string) 
*kernel.MessageQueue {
+       v, exist := p.publishInfo.Load(topic)
+
+       if !exist {
+               p.client.UpdatePublishInfo(topic, 
kernel.UpdateTopicRouteInfo(topic))
+               v, exist = p.publishInfo.Load(topic)
+       }
+
+       if !exist {
+               return nil
+       }
+
+       result := v.(*kernel.TopicPublishInfo)
+       if result == nil || !result.HaveTopicRouterInfo {
+               return nil
+       }
+
+       if result.MqList != nil && len(result.MqList) <= 0 {
+               rlog.Error("can not find proper message queue")
+               return nil
+       }
+       return result.MqList[int(atomic.AddInt32(&result.TopicQueueIndex, 
1))%len(result.MqList)]
+}
+
+func (p *defaultProducer) PublishTopicList() []string {
+       topics := make([]string, 0)
+       p.publishInfo.Range(func(key, value interface{}) bool {
+               topics = append(topics, key.(string))
+               return true
+       })
+       return topics
+}
+
+func (p *defaultProducer) UpdateTopicPublishInfo(topic string, info 
*kernel.TopicPublishInfo) {
+       if topic == "" || info == nil {
+               return
+       }
+       p.publishInfo.Store(topic, info)
+}
+
+func (p *defaultProducer) IsPublishTopicNeedUpdate(topic string) bool {
+       v, exist := p.publishInfo.Load(topic)
+       if !exist {
+               return true
+       }
+       info := v.(*kernel.TopicPublishInfo)
+       return info.MqList == nil || len(info.MqList) == 0
+}
+
+func (p *defaultProducer) IsUnitMode() bool {
+       return false
+}
+
+func propertiesToString(properties map[string]string) string {
+       if properties == nil {
+               return ""
+       }
+       var str string
+       for k, v := range properties {
+               str += fmt.Sprintf("%s%v%s%v", k, byte(1), v, byte(2))
+       }
+       return str
+}
diff --git a/producer_test.go b/producer_test.go
deleted file mode 100644
index 9fa466e..0000000
--- a/producer_test.go
+++ /dev/null
@@ -1,12 +0,0 @@
-package rocketmq
-
-import (
-       "testing"
-)
-
-func TestProducer_Send(t *testing.T) {
-       producer := NewProducer(ProducerConfig{
-               GroupName: "testGroup",
-       })
-       producer.Start()
-}
diff --git a/utils/messagesysflag.go b/utils/fun.go
similarity index 61%
copy from utils/messagesysflag.go
copy to utils/fun.go
index c113be1..4e9e4ea 100644
--- a/utils/messagesysflag.go
+++ b/utils/fun.go
@@ -16,31 +16,3 @@ limitations under the License.
 */
 
 package utils
-
-
-
-var(
-       COMPRESSED_FLAG = 0x1
-
-       MULTI_TAGS_FLAG = 0x1 << 1
-
-       TRANSACTION_NOT_TYPE = 0
-
-       TRANSACTION_PREPARED_TYPE = 0x1 << 2
-
-       TRANSACTION_COMMIT_TYPE = 0x2 << 2
-
-       TRANSACTION_ROLLBACK_TYPE = 0x3 << 2
-)
-
-func GetTransactionValue(flag int) int {
-       return flag & TRANSACTION_ROLLBACK_TYPE
-}
-
-func ResetTransactionValue(flag int, typeFlag int) int {
-       return (flag & (^TRANSACTION_ROLLBACK_TYPE)) | typeFlag
-}
-
-func ClearCompressedFlag(flag int) int {
-       return flag & (^COMPRESSED_FLAG)
-}
\ No newline at end of file
diff --git a/utils/messagesysflag.go b/utils/messagesysflag.go
index c113be1..589490d 100644
--- a/utils/messagesysflag.go
+++ b/utils/messagesysflag.go
@@ -17,9 +17,7 @@ limitations under the License.
 
 package utils
 
-
-
-var(
+var (
        COMPRESSED_FLAG = 0x1
 
        MULTI_TAGS_FLAG = 0x1 << 1
@@ -43,4 +41,4 @@ func ResetTransactionValue(flag int, typeFlag int) int {
 
 func ClearCompressedFlag(flag int) int {
        return flag & (^COMPRESSED_FLAG)
-}
\ No newline at end of file
+}

Reply via email to