This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/native by this push:
     new ef39b28  Native (#56)
ef39b28 is described below

commit ef39b282bb8a61bbb926ed59a57a2097d3c6b032
Author: yiduwangkai <[email protected]>
AuthorDate: Tue May 7 10:13:20 2019 +0800

    Native (#56)
    
    * rocket mq go produce client
    
    * modify error code
---
 .gitignore               |   2 +
 common/CommonConstant.go |   8 ++
 go.mod                   |   7 ++
 go.sum                   |   2 +
 kernel/client.go         |  18 ++++-
 kernel/message.go        |   1 +
 kernel/request.go        |   2 +
 kernel/route.go          |  12 ++-
 producer.go              | 189 +++++++++++++++++++++++++++++++++++++++++++++++
 producer_test.go         |  12 +++
 utils/messagesysflag.go  |  46 ++++++++++++
 11 files changed, 296 insertions(+), 3 deletions(-)

diff --git a/.gitignore b/.gitignore
index 485dee6..cedd413 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1 +1,3 @@
 .idea
+go.mod
+go.sum
\ No newline at end of file
diff --git a/common/CommonConstant.go b/common/CommonConstant.go
new file mode 100644
index 0000000..bd4e7ce
--- /dev/null
+++ b/common/CommonConstant.go
@@ -0,0 +1,8 @@
+package common
+
+const (
+       CommunicationMode = iota
+       SYNC
+       ASYNC
+       ONEWAY
+)
\ No newline at end of file
diff --git a/go.mod b/go.mod
index 58ca7c7..b2ca040 100644
--- a/go.mod
+++ b/go.mod
@@ -13,3 +13,10 @@ require (
        github.com/tidwall/pretty v0.0.0-20180105212114-65a9db5fad51 // indirect
        gopkg.in/alecthomas/kingpin.v2 v2.2.6
 )
+
+replace (
+       golang.org/x/crypto v0.0.0-20180904163835-0709b304e793 => 
github.com/golang/crypto v0.0.0-20180904163835-0709b304e793
+       golang.org/x/net v0.0.0-20180821023952-922f4815f713 => 
github.com/golang/net v0.0.0-20180826012351-8a410e7b638d
+       golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33 => 
github.com/golang/sys v0.0.0-20180905080454-ebe1bf3edb33
+       golang.org/x/text v0.3.0 => github.com/golang/text v0.3.0
+)
diff --git a/go.sum b/go.sum
index 85b0d5e..da32c5e 100644
--- a/go.sum
+++ b/go.sum
@@ -5,6 +5,8 @@ github.com/alecthomas/units 
v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRF
 github.com/davecgh/go-spew v1.1.0/go.mod 
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
 github.com/davecgh/go-spew v1.1.1 
h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
 github.com/davecgh/go-spew v1.1.1/go.mod 
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/golang/crypto v0.0.0-20180904163835-0709b304e793/go.mod 
h1:uZvAcrsnNaCxlh1HorK5dUQHGmEKPh2H/Rl1kehswPo=
+github.com/golang/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod 
h1:5JyrLPvD/ZdaYkT7IqKhsP5xt7aLjA99KXRtk4EIYDk=
 github.com/emirpasic/gods v1.12.0 
h1:QAUIPSaCu4G+POclxeqb3F+WPpdKqFGlw36+yOzGlrg=
 github.com/emirpasic/gods v1.12.0/go.mod 
h1:YfzfFFoVP/catgzJb4IKIqXjX78Ha8FMSDh3ymbK86o=
 github.com/konsorten/go-windows-terminal-sequences v1.0.1 
h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk=
diff --git a/kernel/client.go b/kernel/client.go
index 8403ccb..e388411 100644
--- a/kernel/client.go
+++ b/kernel/client.go
@@ -18,6 +18,7 @@ limitations under the License.
 package kernel
 
 import (
+       "bytes"
        "context"
        "errors"
        "fmt"
@@ -253,7 +254,7 @@ func (c *RMQClient) UpdateTopicRouteInfo() {
 // SendMessage with batch by sync
 func (c *RMQClient) SendMessageSync(ctx context.Context, brokerAddrs, 
brokerName string, request *SendMessageRequest,
        msgs []*Message) (*SendResult, error) {
-       cmd := remote.NewRemotingCommand(ReqSendBatchMessage, request, 
encodeMessages(msgs))
+       cmd := getRemotingCommand(request, msgs)
        response, err := remote.InvokeSync(brokerAddrs, cmd, 3*time.Second)
        if err != nil {
                rlog.Warnf("send messages with sync error: %v", err)
@@ -263,6 +264,14 @@ func (c *RMQClient) SendMessageSync(ctx context.Context, 
brokerAddrs, brokerName
        return c.processSendResponse(brokerName, msgs, response), nil
 }
 
+func getRemotingCommand(request *SendMessageRequest,  msgs []*Message) 
*remote.RemotingCommand {
+       if request.Batch {
+               return remote.NewRemotingCommand(ReqSendBatchMessage, request, 
encodeMessages(msgs))
+       } else {
+               return remote.NewRemotingCommand(ReqSendMessage, request, 
encodeMessages(msgs))
+       }
+}
+
 // SendMessageAsync send message with batch by async
 func (c *RMQClient) SendMessageAsync(ctx context.Context, brokerAddrs, 
brokerName string, request *SendMessageRequest,
        msgs []*Message, f func(result *SendResult)) error {
@@ -541,5 +550,10 @@ func routeData2SubscribeInfo(topic string, data 
*TopicRouteData) []*MessageQueue
 }
 
 func encodeMessages(message []*Message) []byte {
-       return nil
+       var buffer bytes.Buffer
+       index := 0
+       for index < len(message){
+               buffer.Write(message[index].Body)
+       }
+       return buffer.Bytes()
 }
diff --git a/kernel/message.go b/kernel/message.go
index 117d766..0f65853 100644
--- a/kernel/message.go
+++ b/kernel/message.go
@@ -54,6 +54,7 @@ type Message struct {
        Flag          int32
        Properties    map[string]string
        TransactionId string
+       Batch         bool
 }
 
 func NewMessage(topic string, body []byte) *Message {
diff --git a/kernel/request.go b/kernel/request.go
index 36770ec..2904242 100644
--- a/kernel/request.go
+++ b/kernel/request.go
@@ -24,6 +24,7 @@ import (
 )
 
 const (
+       ReqSendMessage         = int16(10)
        ReqPullMessage             = int16(11)
        ReqQueryConsumerOffset     = int16(14)
        ReqUpdateConsumerOffset    = int16(15)
@@ -50,6 +51,7 @@ type SendMessageRequest struct {
        ReconsumeTimes        int    `json:"reconsumeTimes"`
        UnitMode              bool   `json:"unitMode"`
        MaxReconsumeTimes     int    `json:"maxReconsumeTimes"`
+       Batch                 bool   `json:"batch"`
 }
 
 func (request *SendMessageRequest) Encode() map[string]string {
diff --git a/kernel/route.go b/kernel/route.go
index 4ff4f99..6a232cf 100644
--- a/kernel/route.go
+++ b/kernel/route.go
@@ -51,7 +51,7 @@ var (
        // brokerName -> map[string]int32
        brokerVersionMap sync.Map
 
-       //publishInfoMap sync.Map
+       publishInfoMap sync.Map
        //subscribeInfoMap sync.Map
        routeDataMap sync.Map
        lockNamesrv  sync.Mutex
@@ -148,6 +148,16 @@ func FindBrokerAddrByName(brokerName string) string {
        return bd.(*BrokerData).BrokerAddresses[MasterId]
 }
 
+func FindTopicPublishInfo(topic string) *TopicPublishInfo {
+       tpi, exist := publishInfoMap.Load(topic)
+       if exist {
+               if tpi.(*TopicPublishInfo).isOK() {
+                       return tpi.(*TopicPublishInfo)
+               }
+       }
+       return nil
+}
+
 func FindBrokerAddressInSubscribe(brokerName string, brokerId int64, 
onlyThisBroker bool) *FindBrokerResult {
        var (
                brokerAddr = ""
diff --git a/producer.go b/producer.go
new file mode 100644
index 0000000..0bc55ea
--- /dev/null
+++ b/producer.go
@@ -0,0 +1,189 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package rocketmq
+
+import (
+       "context"
+       "encoding/json"
+       "github.com/apache/rocketmq-client-go/common"
+       "github.com/apache/rocketmq-client-go/kernel"
+       "github.com/apache/rocketmq-client-go/rlog"
+       "github.com/apache/rocketmq-client-go/utils"
+       "os"
+       "strconv"
+       "strings"
+       "sync/atomic"
+       "time"
+)
+
+type Producer interface {
+       Start()
+       Send(msg *kernel.Message) (*kernel.SendResult, error)
+}
+
+type defaultProducer struct {
+       client    *kernel.RMQClient
+       state  kernel.ServiceState
+       config ProducerConfig
+}
+
+type ProducerConfig struct {
+       GroupName                  string
+       CommunicationType          int
+       RetryTimesWhenSendFailed   int
+}
+
+func (c *defaultProducer) Send(msg *kernel.Message) (*kernel.SendResult, 
error) {
+       result :=tryToFindTopicPublishInfo(msg.Topic)
+       if result != nil && result.HaveTopicRouterInfo {
+               if c.config.RetryTimesWhenSendFailed == 0 {
+                       c.config.RetryTimesWhenSendFailed = 2
+               }
+               retryTime := 1
+               if  c.config.CommunicationType == common.SYNC {
+                       retryTime = 1 + c.config.RetryTimesWhenSendFailed
+               }
+               for retryCount := 0;  retryCount < retryTime; retryCount++ {
+                       messageQueue := getMessageQueue(result)
+                       if messageQueue == nil {
+                               continue
+                       }
+
+                       brokerAddress := 
kernel.FindBrokerAddrByName(messageQueue.BrokerName)
+                       if brokerAddress == "" {
+                               break
+                       }
+                       brokerAddress = brokerVIPChannel(brokerAddress)
+                       sysFlag := 0
+                       tranMsg := msg.Properties["TRAN_MSG"]
+                       if tranMsg != "" {
+                               tranMsgBool, err :=strconv.ParseBool(tranMsg)
+                               if err == nil && tranMsgBool {
+                                       sysFlag |= 
utils.TRANSACTION_PREPARED_TYPE;
+                               }
+                               sendMsg, err := getSendMessage(msg, c, 
messageQueue, sysFlag)
+                               if err != nil {
+                                       continue
+                               }
+                               return sendKernel(brokerAddress, c, msg, 
sendMsg, messageQueue)
+                       }
+               }
+       }
+       return nil, nil
+}
+
+func sendKernel(brokerAddress string, receive *defaultProducer, msg 
*kernel.Message, sendMsg *kernel.SendMessageRequest, messageQueue 
*kernel.MessageQueue) (*kernel.SendResult, error) {
+       switch receive.config.CommunicationType{
+       case common.SYNC:
+               return receive.client.SendMessageSync(context.Background(), 
brokerAddress, messageQueue.BrokerName, sendMsg, []*kernel.Message{msg})
+       case common.ASYNC:
+               //kernel.SendMessageAsync()
+               return nil,nil
+       case common.ONEWAY:
+               return receive.client.SendMessageOneWay(context.Background(), 
brokerAddress, sendMsg, []*kernel.Message{msg})
+       default:
+               return nil,nil
+       }
+}
+
+func getSendMessage(msg *kernel.Message, receiver *defaultProducer, 
messageQueue *kernel.MessageQueue, sysFlag int) 
(*kernel.SendMessageRequest,error) {
+
+       properties, err := json.Marshal(msg.Properties)
+       maxReconsumeTimes, errtimes := 
strconv.Atoi(msg.Properties["MAX_RECONSUME_TIMES"])
+       if errtimes != nil {
+               maxReconsumeTimes = 1
+       }
+       if err == nil {
+               sendMessageRequest := &kernel.SendMessageRequest{
+                       ProducerGroup:         receiver.config.GroupName,
+                       Topic:                 msg.Topic,
+                       DefaultTopic:          "TBW102",
+                       DefaultTopicQueueNums: 8,
+                       QueueId:               int32(messageQueue.QueueId),
+                       SysFlag:                           sysFlag,
+                       BornTimestamp:         time.Now().Unix(),
+                       Flag:                              int(msg.Flag),
+                       Properties:            string(properties),
+                       ReconsumeTimes:            0,
+                       UnitMode:                          false,
+                       MaxReconsumeTimes:     maxReconsumeTimes,
+                       Batch:                 msg.Batch,
+               }
+               return sendMessageRequest,nil
+       }
+       return nil,err
+}
+
+
+func brokerVIPChannel(brokerAddr string) string {
+       var brokerAddrNew strings.Builder
+       var isChange bool
+       var err error
+       if os.Getenv("com.rocketmq.sendMessageWithVIPChannel") != "" {
+               isChange, err = 
strconv.ParseBool(os.Getenv("com.rocketmq.sendMessageWithVIPChannel"))
+               if err != nil {
+                       isChange = true
+               }
+       }else{
+               isChange = true
+       }
+       if isChange {
+       ipAndPort := strings.Split(brokerAddr, ":")
+
+       port, err :=strconv.Atoi(ipAndPort[1])
+
+       if err != nil {
+               return ""
+       }
+       brokerAddrNew.WriteString(ipAndPort[0])
+       brokerAddrNew.WriteString(":")
+       brokerAddrNew.WriteString(strconv.Itoa(port-2))
+       return brokerAddrNew.String();
+       } else {
+               return brokerAddr;
+       }
+}
+
+
+func tryToFindTopicPublishInfo(topic string) *kernel.TopicPublishInfo {
+       result := kernel.FindTopicPublishInfo(topic)
+
+       if result == nil {
+               kernel.UpdateTopicRouteInfo(topic)
+       }
+       return kernel.FindTopicPublishInfo(topic)
+}
+
+func getMessageQueue(tpInfo *kernel.TopicPublishInfo) *kernel.MessageQueue {
+       if tpInfo.MqList != nil && len(tpInfo.MqList) <= 0 {
+               rlog.Error("can not find proper message queue")
+               return nil
+       }
+       return tpInfo.MqList[int(atomic.AddInt32(&tpInfo.TopicQueueIndex, 
1))%len(tpInfo.MqList)]
+}
+
+func (c *defaultProducer) Start() {
+       c.state = kernel.StateRunning
+}
+
+func NewProducer(config ProducerConfig) Producer {
+       return &defaultProducer{
+               config: config,
+       }
+}
+
diff --git a/producer_test.go b/producer_test.go
new file mode 100644
index 0000000..9fa466e
--- /dev/null
+++ b/producer_test.go
@@ -0,0 +1,12 @@
+package rocketmq
+
+import (
+       "testing"
+)
+
+func TestProducer_Send(t *testing.T) {
+       producer := NewProducer(ProducerConfig{
+               GroupName: "testGroup",
+       })
+       producer.Start()
+}
diff --git a/utils/messagesysflag.go b/utils/messagesysflag.go
new file mode 100644
index 0000000..c113be1
--- /dev/null
+++ b/utils/messagesysflag.go
@@ -0,0 +1,46 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package utils
+
+
+
+var(
+       COMPRESSED_FLAG = 0x1
+
+       MULTI_TAGS_FLAG = 0x1 << 1
+
+       TRANSACTION_NOT_TYPE = 0
+
+       TRANSACTION_PREPARED_TYPE = 0x1 << 2
+
+       TRANSACTION_COMMIT_TYPE = 0x2 << 2
+
+       TRANSACTION_ROLLBACK_TYPE = 0x3 << 2
+)
+
+func GetTransactionValue(flag int) int {
+       return flag & TRANSACTION_ROLLBACK_TYPE
+}
+
+func ResetTransactionValue(flag int, typeFlag int) int {
+       return (flag & (^TRANSACTION_ROLLBACK_TYPE)) | typeFlag
+}
+
+func ClearCompressedFlag(flag int) int {
+       return flag & (^COMPRESSED_FLAG)
+}
\ No newline at end of file

Reply via email to