This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/native by this push:
new ef39b28 Native (#56)
ef39b28 is described below
commit ef39b282bb8a61bbb926ed59a57a2097d3c6b032
Author: yiduwangkai <[email protected]>
AuthorDate: Tue May 7 10:13:20 2019 +0800
Native (#56)
* rocket mq go produce client
* modify error code
---
.gitignore | 2 +
common/CommonConstant.go | 8 ++
go.mod | 7 ++
go.sum | 2 +
kernel/client.go | 18 ++++-
kernel/message.go | 1 +
kernel/request.go | 2 +
kernel/route.go | 12 ++-
producer.go | 189 +++++++++++++++++++++++++++++++++++++++++++++++
producer_test.go | 12 +++
utils/messagesysflag.go | 46 ++++++++++++
11 files changed, 296 insertions(+), 3 deletions(-)
diff --git a/.gitignore b/.gitignore
index 485dee6..cedd413 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1 +1,3 @@
.idea
+go.mod
+go.sum
\ No newline at end of file
diff --git a/common/CommonConstant.go b/common/CommonConstant.go
new file mode 100644
index 0000000..bd4e7ce
--- /dev/null
+++ b/common/CommonConstant.go
@@ -0,0 +1,8 @@
+package common
+
+const (
+ CommunicationMode = iota
+ SYNC
+ ASYNC
+ ONEWAY
+)
\ No newline at end of file
diff --git a/go.mod b/go.mod
index 58ca7c7..b2ca040 100644
--- a/go.mod
+++ b/go.mod
@@ -13,3 +13,10 @@ require (
github.com/tidwall/pretty v0.0.0-20180105212114-65a9db5fad51 // indirect
gopkg.in/alecthomas/kingpin.v2 v2.2.6
)
+
+replace (
+ golang.org/x/crypto v0.0.0-20180904163835-0709b304e793 =>
github.com/golang/crypto v0.0.0-20180904163835-0709b304e793
+ golang.org/x/net v0.0.0-20180821023952-922f4815f713 =>
github.com/golang/net v0.0.0-20180826012351-8a410e7b638d
+ golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33 =>
github.com/golang/sys v0.0.0-20180905080454-ebe1bf3edb33
+ golang.org/x/text v0.3.0 => github.com/golang/text v0.3.0
+)
diff --git a/go.sum b/go.sum
index 85b0d5e..da32c5e 100644
--- a/go.sum
+++ b/go.sum
@@ -5,6 +5,8 @@ github.com/alecthomas/units
v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRF
github.com/davecgh/go-spew v1.1.0/go.mod
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1
h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/golang/crypto v0.0.0-20180904163835-0709b304e793/go.mod
h1:uZvAcrsnNaCxlh1HorK5dUQHGmEKPh2H/Rl1kehswPo=
+github.com/golang/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod
h1:5JyrLPvD/ZdaYkT7IqKhsP5xt7aLjA99KXRtk4EIYDk=
github.com/emirpasic/gods v1.12.0
h1:QAUIPSaCu4G+POclxeqb3F+WPpdKqFGlw36+yOzGlrg=
github.com/emirpasic/gods v1.12.0/go.mod
h1:YfzfFFoVP/catgzJb4IKIqXjX78Ha8FMSDh3ymbK86o=
github.com/konsorten/go-windows-terminal-sequences v1.0.1
h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk=
diff --git a/kernel/client.go b/kernel/client.go
index 8403ccb..e388411 100644
--- a/kernel/client.go
+++ b/kernel/client.go
@@ -18,6 +18,7 @@ limitations under the License.
package kernel
import (
+ "bytes"
"context"
"errors"
"fmt"
@@ -253,7 +254,7 @@ func (c *RMQClient) UpdateTopicRouteInfo() {
// SendMessage with batch by sync
func (c *RMQClient) SendMessageSync(ctx context.Context, brokerAddrs,
brokerName string, request *SendMessageRequest,
msgs []*Message) (*SendResult, error) {
- cmd := remote.NewRemotingCommand(ReqSendBatchMessage, request,
encodeMessages(msgs))
+ cmd := getRemotingCommand(request, msgs)
response, err := remote.InvokeSync(brokerAddrs, cmd, 3*time.Second)
if err != nil {
rlog.Warnf("send messages with sync error: %v", err)
@@ -263,6 +264,14 @@ func (c *RMQClient) SendMessageSync(ctx context.Context,
brokerAddrs, brokerName
return c.processSendResponse(brokerName, msgs, response), nil
}
+func getRemotingCommand(request *SendMessageRequest, msgs []*Message)
*remote.RemotingCommand {
+ if request.Batch {
+ return remote.NewRemotingCommand(ReqSendBatchMessage, request,
encodeMessages(msgs))
+ } else {
+ return remote.NewRemotingCommand(ReqSendMessage, request,
encodeMessages(msgs))
+ }
+}
+
// SendMessageAsync send message with batch by async
func (c *RMQClient) SendMessageAsync(ctx context.Context, brokerAddrs,
brokerName string, request *SendMessageRequest,
msgs []*Message, f func(result *SendResult)) error {
@@ -541,5 +550,10 @@ func routeData2SubscribeInfo(topic string, data
*TopicRouteData) []*MessageQueue
}
func encodeMessages(message []*Message) []byte {
- return nil
+ var buffer bytes.Buffer
+ index := 0
+ for index < len(message){
+ buffer.Write(message[index].Body)
+ }
+ return buffer.Bytes()
}
diff --git a/kernel/message.go b/kernel/message.go
index 117d766..0f65853 100644
--- a/kernel/message.go
+++ b/kernel/message.go
@@ -54,6 +54,7 @@ type Message struct {
Flag int32
Properties map[string]string
TransactionId string
+ Batch bool
}
func NewMessage(topic string, body []byte) *Message {
diff --git a/kernel/request.go b/kernel/request.go
index 36770ec..2904242 100644
--- a/kernel/request.go
+++ b/kernel/request.go
@@ -24,6 +24,7 @@ import (
)
const (
+ ReqSendMessage = int16(10)
ReqPullMessage = int16(11)
ReqQueryConsumerOffset = int16(14)
ReqUpdateConsumerOffset = int16(15)
@@ -50,6 +51,7 @@ type SendMessageRequest struct {
ReconsumeTimes int `json:"reconsumeTimes"`
UnitMode bool `json:"unitMode"`
MaxReconsumeTimes int `json:"maxReconsumeTimes"`
+ Batch bool `json:"batch"`
}
func (request *SendMessageRequest) Encode() map[string]string {
diff --git a/kernel/route.go b/kernel/route.go
index 4ff4f99..6a232cf 100644
--- a/kernel/route.go
+++ b/kernel/route.go
@@ -51,7 +51,7 @@ var (
// brokerName -> map[string]int32
brokerVersionMap sync.Map
- //publishInfoMap sync.Map
+ publishInfoMap sync.Map
//subscribeInfoMap sync.Map
routeDataMap sync.Map
lockNamesrv sync.Mutex
@@ -148,6 +148,16 @@ func FindBrokerAddrByName(brokerName string) string {
return bd.(*BrokerData).BrokerAddresses[MasterId]
}
+func FindTopicPublishInfo(topic string) *TopicPublishInfo {
+ tpi, exist := publishInfoMap.Load(topic)
+ if exist {
+ if tpi.(*TopicPublishInfo).isOK() {
+ return tpi.(*TopicPublishInfo)
+ }
+ }
+ return nil
+}
+
func FindBrokerAddressInSubscribe(brokerName string, brokerId int64,
onlyThisBroker bool) *FindBrokerResult {
var (
brokerAddr = ""
diff --git a/producer.go b/producer.go
new file mode 100644
index 0000000..0bc55ea
--- /dev/null
+++ b/producer.go
@@ -0,0 +1,189 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package rocketmq
+
+import (
+ "context"
+ "encoding/json"
+ "github.com/apache/rocketmq-client-go/common"
+ "github.com/apache/rocketmq-client-go/kernel"
+ "github.com/apache/rocketmq-client-go/rlog"
+ "github.com/apache/rocketmq-client-go/utils"
+ "os"
+ "strconv"
+ "strings"
+ "sync/atomic"
+ "time"
+)
+
+type Producer interface {
+ Start()
+ Send(msg *kernel.Message) (*kernel.SendResult, error)
+}
+
+type defaultProducer struct {
+ client *kernel.RMQClient
+ state kernel.ServiceState
+ config ProducerConfig
+}
+
+type ProducerConfig struct {
+ GroupName string
+ CommunicationType int
+ RetryTimesWhenSendFailed int
+}
+
+func (c *defaultProducer) Send(msg *kernel.Message) (*kernel.SendResult,
error) {
+ result :=tryToFindTopicPublishInfo(msg.Topic)
+ if result != nil && result.HaveTopicRouterInfo {
+ if c.config.RetryTimesWhenSendFailed == 0 {
+ c.config.RetryTimesWhenSendFailed = 2
+ }
+ retryTime := 1
+ if c.config.CommunicationType == common.SYNC {
+ retryTime = 1 + c.config.RetryTimesWhenSendFailed
+ }
+ for retryCount := 0; retryCount < retryTime; retryCount++ {
+ messageQueue := getMessageQueue(result)
+ if messageQueue == nil {
+ continue
+ }
+
+ brokerAddress :=
kernel.FindBrokerAddrByName(messageQueue.BrokerName)
+ if brokerAddress == "" {
+ break
+ }
+ brokerAddress = brokerVIPChannel(brokerAddress)
+ sysFlag := 0
+ tranMsg := msg.Properties["TRAN_MSG"]
+ if tranMsg != "" {
+ tranMsgBool, err :=strconv.ParseBool(tranMsg)
+ if err == nil && tranMsgBool {
+ sysFlag |=
utils.TRANSACTION_PREPARED_TYPE;
+ }
+ sendMsg, err := getSendMessage(msg, c,
messageQueue, sysFlag)
+ if err != nil {
+ continue
+ }
+ return sendKernel(brokerAddress, c, msg,
sendMsg, messageQueue)
+ }
+ }
+ }
+ return nil, nil
+}
+
+func sendKernel(brokerAddress string, receive *defaultProducer, msg
*kernel.Message, sendMsg *kernel.SendMessageRequest, messageQueue
*kernel.MessageQueue) (*kernel.SendResult, error) {
+ switch receive.config.CommunicationType{
+ case common.SYNC:
+ return receive.client.SendMessageSync(context.Background(),
brokerAddress, messageQueue.BrokerName, sendMsg, []*kernel.Message{msg})
+ case common.ASYNC:
+ //kernel.SendMessageAsync()
+ return nil,nil
+ case common.ONEWAY:
+ return receive.client.SendMessageOneWay(context.Background(),
brokerAddress, sendMsg, []*kernel.Message{msg})
+ default:
+ return nil,nil
+ }
+}
+
+func getSendMessage(msg *kernel.Message, receiver *defaultProducer,
messageQueue *kernel.MessageQueue, sysFlag int)
(*kernel.SendMessageRequest,error) {
+
+ properties, err := json.Marshal(msg.Properties)
+ maxReconsumeTimes, errtimes :=
strconv.Atoi(msg.Properties["MAX_RECONSUME_TIMES"])
+ if errtimes != nil {
+ maxReconsumeTimes = 1
+ }
+ if err == nil {
+ sendMessageRequest := &kernel.SendMessageRequest{
+ ProducerGroup: receiver.config.GroupName,
+ Topic: msg.Topic,
+ DefaultTopic: "TBW102",
+ DefaultTopicQueueNums: 8,
+ QueueId: int32(messageQueue.QueueId),
+ SysFlag: sysFlag,
+ BornTimestamp: time.Now().Unix(),
+ Flag: int(msg.Flag),
+ Properties: string(properties),
+ ReconsumeTimes: 0,
+ UnitMode: false,
+ MaxReconsumeTimes: maxReconsumeTimes,
+ Batch: msg.Batch,
+ }
+ return sendMessageRequest,nil
+ }
+ return nil,err
+}
+
+
+func brokerVIPChannel(brokerAddr string) string {
+ var brokerAddrNew strings.Builder
+ var isChange bool
+ var err error
+ if os.Getenv("com.rocketmq.sendMessageWithVIPChannel") != "" {
+ isChange, err =
strconv.ParseBool(os.Getenv("com.rocketmq.sendMessageWithVIPChannel"))
+ if err != nil {
+ isChange = true
+ }
+ }else{
+ isChange = true
+ }
+ if isChange {
+ ipAndPort := strings.Split(brokerAddr, ":")
+
+ port, err :=strconv.Atoi(ipAndPort[1])
+
+ if err != nil {
+ return ""
+ }
+ brokerAddrNew.WriteString(ipAndPort[0])
+ brokerAddrNew.WriteString(":")
+ brokerAddrNew.WriteString(strconv.Itoa(port-2))
+ return brokerAddrNew.String();
+ } else {
+ return brokerAddr;
+ }
+}
+
+
+func tryToFindTopicPublishInfo(topic string) *kernel.TopicPublishInfo {
+ result := kernel.FindTopicPublishInfo(topic)
+
+ if result == nil {
+ kernel.UpdateTopicRouteInfo(topic)
+ }
+ return kernel.FindTopicPublishInfo(topic)
+}
+
+func getMessageQueue(tpInfo *kernel.TopicPublishInfo) *kernel.MessageQueue {
+ if tpInfo.MqList != nil && len(tpInfo.MqList) <= 0 {
+ rlog.Error("can not find proper message queue")
+ return nil
+ }
+ return tpInfo.MqList[int(atomic.AddInt32(&tpInfo.TopicQueueIndex,
1))%len(tpInfo.MqList)]
+}
+
+func (c *defaultProducer) Start() {
+ c.state = kernel.StateRunning
+}
+
+func NewProducer(config ProducerConfig) Producer {
+ return &defaultProducer{
+ config: config,
+ }
+}
+
diff --git a/producer_test.go b/producer_test.go
new file mode 100644
index 0000000..9fa466e
--- /dev/null
+++ b/producer_test.go
@@ -0,0 +1,12 @@
+package rocketmq
+
+import (
+ "testing"
+)
+
+func TestProducer_Send(t *testing.T) {
+ producer := NewProducer(ProducerConfig{
+ GroupName: "testGroup",
+ })
+ producer.Start()
+}
diff --git a/utils/messagesysflag.go b/utils/messagesysflag.go
new file mode 100644
index 0000000..c113be1
--- /dev/null
+++ b/utils/messagesysflag.go
@@ -0,0 +1,46 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package utils
+
+
+
+var(
+ COMPRESSED_FLAG = 0x1
+
+ MULTI_TAGS_FLAG = 0x1 << 1
+
+ TRANSACTION_NOT_TYPE = 0
+
+ TRANSACTION_PREPARED_TYPE = 0x1 << 2
+
+ TRANSACTION_COMMIT_TYPE = 0x2 << 2
+
+ TRANSACTION_ROLLBACK_TYPE = 0x3 << 2
+)
+
+func GetTransactionValue(flag int) int {
+ return flag & TRANSACTION_ROLLBACK_TYPE
+}
+
+func ResetTransactionValue(flag int, typeFlag int) int {
+ return (flag & (^TRANSACTION_ROLLBACK_TYPE)) | typeFlag
+}
+
+func ClearCompressedFlag(flag int) int {
+ return flag & (^COMPRESSED_FLAG)
+}
\ No newline at end of file