This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/native by this push:
new ddaab38 Partial implementation of Common Module (#36)
ddaab38 is described below
commit ddaab38e9b3855abb3b6745e3b12f01454b59cef
Author: wenfeng <[email protected]>
AuthorDate: Tue Mar 5 13:54:58 2019 +0800
Partial implementation of Common Module (#36)
* Adding implementation of updateRouteInfo
* Modifying APIs of native and adding some implementations
* Adding more APIs for producer/consumer in common
* go mod init
---
common/manager.go | 176 ++++++++++++++++++---
common/message.go | 2 +-
common/perm.go | 58 +++++++
common/request.go | 25 +++
common/response.go | 12 ++
common/route.go | 302 +++++++++++++++++++++++++++++++++++-
common/{route.go => transaction.go} | 2 +-
docs/zh/native-design_zh.md | 10 +-
go.mod | 12 ++
go.sum | 26 ++++
remote/codec.go | 27 ++--
11 files changed, 616 insertions(+), 36 deletions(-)
diff --git a/common/manager.go b/common/manager.go
index 21f0823..63c1896 100644
--- a/common/manager.go
+++ b/common/manager.go
@@ -18,29 +18,118 @@ limitations under the License.
package common
import (
+ "context"
"fmt"
- "sync"
+ "github.com/apache/rocketmq-client-go/remote"
+ "time"
)
-var connMap sync.Map
+const (
+ defaultTraceRegionID = "DefaultRegion"
+ tranceOff = "false"
+)
+
+type InnerProducer interface {
+ PublishTopicList() []string
+ UpdateTopicPublishInfo(topic string, info *TopicPublishInfo)
+ IsPublishTopicNeedUpdate(topic string) bool
+ GetCheckListener() func(msg *MessageExt)
+ GetTransactionListener() TransactionListener
+ //CheckTransactionState()
+ isUnitMode() bool
+}
+
+type InnerConsumer interface {
+ DoRebalance()
+ PersistConsumerOffset()
+ UpdateTopicSubscribeInfo(topic string, mqs []*MessageQueue)
+ IsSubscribeTopicNeedUpdate(topic string) bool
+ IsUnitMode() bool
+}
// SendMessage with batch by sync
-func SendMessage(topic string, msgs *[]Message) error {
- return nil
+func SendMessageSync(ctx context.Context, brokerAddrs, brokerName string,
request *SendMessageRequest,
+ msgs []*Message) (*SendResult, error) {
+ cmd := remote.NewRemotingCommand(SendBatchMessage, request,
encodeMessages(msgs))
+ response, err := remote.InvokeSync(brokerAddrs, cmd, 3*time.Second)
+ if err != nil {
+ return nil, err
+ }
+
+ return processSendResponse(brokerName, msgs, response), nil
}
// SendMessageAsync send message with batch by async
-func SendMessageAsync(topic string, msgs *[]Message, f func(result
*SendResult)) error {
+func SendMessageAsync(ctx context.Context, brokerAddrs, brokerName string,
request *SendMessageRequest,
+ msgs []*Message, f func(result *SendResult)) error {
return nil
}
-// PullMessage with sync
-func PullMessage(request *PullMessageRequest) error {
+func SendMessageOneWay(ctx context.Context, brokerAddrs string, request
*SendMessageRequest,
+ msgs []*Message) (*SendResult, error) {
+ cmd := remote.NewRemotingCommand(SendBatchMessage, request,
encodeMessages(msgs))
+ err := remote.InvokeOneWay(brokerAddrs, cmd)
+ return nil, err
+}
+
+func encodeMessages(message []*Message) []byte {
return nil
}
+func processSendResponse(brokerName string, msgs []*Message, cmd
*remote.RemotingCommand) *SendResult {
+ var status SendStatus
+ switch cmd.Code {
+ case FlushDiskTimeout:
+ status = SendFlushDiskTimeout
+ case FlushSlaveTimeout:
+ status = SendFlushSlaveTimeout
+ case SlaveNotAvailable:
+ status = SendSlaveNotAvailable
+ case Success:
+ status = SendOK
+ default:
+ // TODO process unknown code
+ }
+
+ sendResponse := &SendMessageResponse{}
+ sendResponse.Decode(cmd.ExtFields)
+
+ msgIDs := make([]string, 0)
+ for i := 0; i < len(msgs); i++ {
+ msgIDs = append(msgIDs,
msgs[i].Properties[UniqueClientMessageIdKeyindex])
+
+ }
+
+ regionId := cmd.ExtFields[MsgRegion]
+ trace := cmd.ExtFields[TraceSwitch]
+
+ if regionId == "" {
+ regionId = defaultTraceRegionID
+ }
+
+ return &SendResult{
+ Status: status,
+ MsgIDs: msgIDs,
+ OffsetMsgID: sendResponse.MsgId,
+ MessageQueue: &MessageQueue{
+ Topic: msgs[0].Topic,
+ BrokerName: brokerName,
+ QueueId: int(sendResponse.QueueId),
+ },
+ QueueOffset: sendResponse.QueueOffset,
+ TransactionID: sendResponse.TransactionId,
+ RegionID: regionId,
+ TraceOn: trace != "" && trace != tranceOff,
+ }
+}
+
+// PullMessage with sync
+func PullMessage(ctx context.Context, brokerAddrs string, request
*PullMessageRequest) (*PullResult, error) {
+ return nil, nil
+}
+
// PullMessageAsync pull message async
-func PullMessageAsync(request *PullMessageRequest, f func(result *PullResult))
error {
+func PullMessageAsync(ctx context.Context, brokerAddrs string, request
*PullMessageRequest, f func(result *PullResult)) error {
return nil
}
@@ -76,20 +165,20 @@ const (
// SendResult rocketmq send result
type SendResult struct {
- sendStatus SendStatus
- msgID string
- messageQueue MessageQueue
- queueOffset int64
- transactionID string
- offsetMsgID string
- regionID string
- traceOn bool
+ Status SendStatus
+ MsgIDs []string
+ MessageQueue *MessageQueue
+ QueueOffset int64
+ TransactionID string
+ OffsetMsgID string
+ RegionID string
+ TraceOn bool
}
// SendResult send message result to string(detail result)
func (result *SendResult) String() string {
- return fmt.Sprintf("SendResult [sendStatus=%d, msgId=%s,
offsetMsgId=%s, queueOffset=%d, messageQueue=%s]",
- result.sendStatus, result.msgID, result.offsetMsgID,
result.queueOffset, result.messageQueue.String())
+ return fmt.Sprintf("SendResult [sendStatus=%d, msgIds=%s,
offsetMsgId=%s, queueOffset=%d, messageQueue=%s]",
+ result.Status, result.MsgIDs, result.OffsetMsgID,
result.QueueOffset, result.MessageQueue.String())
}
// PullResult the pull result
@@ -117,9 +206,58 @@ const (
type MessageQueue struct {
Topic string `json:"topic"`
BrokerName string `json:"brokerName"`
- QueueId int32 `json:"queueId"`
+ QueueId int `json:"queueId"`
}
func (mq *MessageQueue) String() string {
return fmt.Sprintf("MessageQueue [topic=%s, brokerName=%s,
queueId=%d]", mq.Topic, mq.BrokerName, mq.QueueId)
}
+
+func CheckClientInBroker() {
+
+}
+
+func SendHeartbeatToAllBrokerWithLock() {
+
+}
+
+func UpdateTopicRouteInfoFromNameServer(topic string) {
+}
+
+func RegisterConsumer(group string, consumer InnerConsumer) {
+
+}
+
+func UnregisterConsumer(group string) {
+
+}
+
+func RegisterProducer(group string, producer InnerProducer) {
+
+}
+
+func UnregisterProducer(group string) {
+
+}
+
+func SelectProducer(group string) InnerProducer {
+ return nil
+}
+
+func SelectConsumer(group string) InnerConsumer {
+ return nil
+}
+
+func FindBrokerAddressInPublish(brokerName string) {
+
+}
+
+func FindBrokerAddressInSubscribe(brokerName string, brokerId int64,
onlyThisBroker bool) *FindBrokerResult {
+ return nil
+}
+
+type FindBrokerResult struct {
+ brokerAddr string
+ slave bool
+ brokerVersion int32
+}
diff --git a/common/message.go b/common/message.go
index e7b64ae..08ead01 100644
--- a/common/message.go
+++ b/common/message.go
@@ -40,7 +40,7 @@ const (
ReconsumeTime = "RECONSUME_TIME"
MsgRegion = "MSG_REGION"
TraceSwitch = "TRACE_ON"
- UniqeClientMessageIdKeyindex = "UNIQ_KEY"
+ UniqueClientMessageIdKeyindex = "UNIQ_KEY"
MaxReconsumeTimes = "MAX_RECONSUME_TIMES"
ConsumeStartTime = "CONSUME_START_TIME"
TranscationPreparedQueueOffset = "TRAN_PREPARED_QUEUE_OFFSET"
diff --git a/common/perm.go b/common/perm.go
new file mode 100644
index 0000000..13f1e7e
--- /dev/null
+++ b/common/perm.go
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package common
+
+const (
+ permPriority = 0x1 << 3
+ permRead = 0x1 << 2
+ permWrite = 0x1 << 1
+ permInherit = 0x1 << 0
+)
+
+func isReadable(perm int) bool {
+ return (perm & permRead) == permRead
+}
+
+func isWriteable(perm int) bool {
+ return (perm & permWrite) == permWrite
+}
+
+func isInherited(perm int) bool {
+ return (perm & permInherit) == permInherit
+}
+
+func perm2string(perm int) string {
+ bytes := make([]byte, 3)
+ for i := 0; i < 3; i++ {
+ bytes[i] = '-'
+ }
+
+ if isReadable(perm) {
+ bytes[0] = 'R'
+ }
+
+ if isWriteable(perm) {
+ bytes[1] = 'W'
+ }
+
+ if isInherited(perm) {
+ bytes[2] = 'X'
+ }
+
+ return string(bytes)
+}
diff --git a/common/request.go b/common/request.go
index 48be702..83d74a0 100644
--- a/common/request.go
+++ b/common/request.go
@@ -17,6 +17,11 @@ limitations under the License.
package common
+const (
+ GetRouteInfoByTopic = int16(105)
+ SendBatchMessage = int16(320)
+)
+
type SendMessageRequest struct {
ProducerGroup string `json:"producerGroup"`
Topic string `json:"topic"`
@@ -32,6 +37,14 @@ type SendMessageRequest struct {
MaxReconsumeTimes int `json:"maxReconsumeTimes"`
}
+func (request *SendMessageRequest) Encode() map[string]string {
+ return nil
+}
+
+func (request *SendMessageRequest) Decode(properties map[string]string) error {
+ return nil
+}
+
type PullMessageRequest struct {
ConsumerGroup string `json:"consumerGroup"`
Topic string `json:"topic"`
@@ -68,3 +81,15 @@ type UpdateConsumerOffsetRequest struct {
QueueId int32 `json:"queueId"`
CommitOffset int64 `json:"commitOffset"`
}
+
+type GetRouteInfoRequest struct {
+ Topic string `json:"topic"`
+}
+
+func (request *GetRouteInfoRequest) Encode() map[string]string {
+ return nil
+}
+
+func (request *GetRouteInfoRequest) Decode(properties map[string]string) error
{
+ return nil
+}
diff --git a/common/response.go b/common/response.go
index d4d96e5..0fc0101 100644
--- a/common/response.go
+++ b/common/response.go
@@ -17,6 +17,14 @@ limitations under the License.
package common
+const (
+ Success = int16(0)
+ FlushDiskTimeout = int16(10)
+ SlaveNotAvailable = int16(11)
+ FlushSlaveTimeout = int16(12)
+ TopicNotExist = int16(17)
+)
+
type SendMessageResponse struct {
MsgId string
QueueId int32
@@ -25,6 +33,10 @@ type SendMessageResponse struct {
MsgRegion string
}
+func (response *SendMessageResponse) Decode(properties map[string]string) {
+
+}
+
type PullMessageResponse struct {
SuggestWhichBrokerId int64
NextBeginOffset int64
diff --git a/common/route.go b/common/route.go
index 8b79e12..33f25e1 100644
--- a/common/route.go
+++ b/common/route.go
@@ -17,5 +17,305 @@ limitations under the License.
package common
-type RouteInfo struct {
+import (
+ "encoding/json"
+ "github.com/apache/rocketmq-client-go/remote"
+ "github.com/pkg/errors"
+ log "github.com/sirupsen/logrus"
+ "sort"
+ "strconv"
+ "strings"
+ "sync"
+ "sync/atomic"
+ "time"
+)
+
+const (
+ requestTimeout = 3000
+ defaultTopic = "TBW102"
+ defaultQueueNums = 4
+ masterId = int64(0)
+)
+
+var (
+ ErrTopicNotExist = errors.New("topic not exist")
+)
+
+var (
+ brokerAddressesMap sync.Map
+ publishInfoMap sync.Map
+ routeDataMap sync.Map
+ lockNamesrv sync.Mutex
+)
+
+// key is topic, value is TopicPublishInfo
+type TopicPublishInfo struct {
+ OrderTopic bool
+ HaveTopicRouterInfo bool
+ MqList []*MessageQueue
+ RouteData *topicRouteData
+ TopicQueueIndex int32
+}
+
+func (info *TopicPublishInfo) isOK() (bIsTopicOk bool) {
+ return len(info.MqList) > 0
+}
+
+func (info *TopicPublishInfo) fetchQueueIndex() int {
+ length := len(info.MqList)
+ if length <= 0 {
+ return -1
+ }
+ qIndex := atomic.AddInt32(&info.TopicQueueIndex, 1)
+ return int(qIndex) % length
+}
+
+func tryToFindTopicPublishInfo(topic string) *TopicPublishInfo {
+ value, exist := publishInfoMap.Load(topic)
+
+ var info *TopicPublishInfo
+ if exist {
+ info = value.(*TopicPublishInfo)
+ }
+
+ if info == nil || !info.isOK() {
+ updateTopicRouteInfo(topic)
+ value, exist = publishInfoMap.Load(topic)
+ if !exist {
+ info = &TopicPublishInfo{HaveTopicRouterInfo: false}
+ } else {
+ info = value.(*TopicPublishInfo)
+ }
+ }
+
+ if info.HaveTopicRouterInfo || info.isOK() {
+ return info
+ }
+
+ value, exist = publishInfoMap.Load(topic)
+ if exist {
+ return value.(*TopicPublishInfo)
+ }
+
+ return nil
+}
+
+func updateTopicRouteInfo(topic string) {
+ // Todo process lock timeout
+ lockNamesrv.Lock()
+ defer lockNamesrv.Unlock()
+
+ RouteData, err := queryTopicRouteInfoFromServer(topic, requestTimeout)
+ if err != nil {
+ log.Warningf("query topic route from server error: %s", err)
+ return
+ }
+
+ if RouteData == nil {
+ log.Warningf("queryTopicRouteInfoFromServer return nil, Topic:
%s", topic)
+ return
+ }
+
+ var changed bool
+ oldRouteData, exist := routeDataMap.Load(topic)
+ if !exist || RouteData == nil {
+ changed = true
+ } else {
+ changed =
topicRouteDataIsChange(oldRouteData.(*topicRouteData), RouteData)
+ }
+
+ if !changed {
+ changed = isNeedUpdateTopicRouteInfo(topic)
+ } else {
+ log.Infof("the topic[%s] route info changed, old[%s] ,new[%s]",
topic, oldRouteData, RouteData)
+ }
+
+ if !changed {
+ return
+ }
+
+ newTopicRouteData := RouteData.clone()
+
+ for _, brokerData := range newTopicRouteData.brokerDataList {
+ brokerAddressesMap.Store(brokerData.brokerName,
brokerData.brokerAddresses)
+ }
+
+ // update publish info
+ publishInfo := RouteData2PublishInfo(topic, RouteData)
+ publishInfo.HaveTopicRouterInfo = true
+
+ old, _ := publishInfoMap.Load(topic)
+ publishInfoMap.Store(topic, publishInfoMap)
+ if old != nil {
+ log.Infof("Old TopicPublishInfo [%s] removed.", old)
+ }
+}
+
+func queryTopicRouteInfoFromServer(topic string, timeout time.Duration)
(*topicRouteData, error) {
+ request := &GetRouteInfoRequest{
+ Topic: topic,
+ }
+ rc := remote.NewRemotingCommand(GetRouteInfoByTopic, request, nil)
+
+ response, err := remote.InvokeSync(getNameServerAddress(), rc, timeout)
+
+ if err != nil {
+ return nil, err
+ }
+
+ switch response.Code {
+ case Success:
+ if response.Body == nil {
+ return nil, errors.New(response.Remark)
+ }
+ RouteData := &topicRouteData{}
+ err = json.Unmarshal(response.Body, RouteData)
+ if err != nil {
+ log.Warningf("unmarshal topicRouteData error: %s", err)
+ return nil, err
+ }
+ return RouteData, nil
+ case TopicNotExist:
+ return nil, ErrTopicNotExist
+ default:
+ return nil, errors.New(response.Remark)
+ }
+}
+
+func topicRouteDataIsChange(oldData *topicRouteData, newData *topicRouteData)
bool {
+ if oldData == nil || newData == nil {
+ return true
+ }
+ oldDataCloned := oldData.clone()
+ newDataCloned := newData.clone()
+
+ sort.Slice(oldDataCloned.queueDataList, func(i, j int) bool {
+ return
strings.Compare(oldDataCloned.queueDataList[i].brokerName,
oldDataCloned.queueDataList[j].brokerName) > 0
+ })
+ sort.Slice(oldDataCloned.brokerDataList, func(i, j int) bool {
+ return
strings.Compare(oldDataCloned.brokerDataList[i].brokerName,
oldDataCloned.brokerDataList[j].brokerName) > 0
+ })
+ sort.Slice(newDataCloned.queueDataList, func(i, j int) bool {
+ return
strings.Compare(newDataCloned.queueDataList[i].brokerName,
newDataCloned.queueDataList[j].brokerName) > 0
+ })
+ sort.Slice(newDataCloned.brokerDataList, func(i, j int) bool {
+ return
strings.Compare(newDataCloned.brokerDataList[i].brokerName,
newDataCloned.brokerDataList[j].brokerName) > 0
+ })
+
+ return !oldDataCloned.equals(newDataCloned)
+}
+
+func isNeedUpdateTopicRouteInfo(topic string) bool {
+ value, exist := publishInfoMap.Load(topic)
+
+ return !exist || value.(*TopicPublishInfo).isOK()
+}
+
+func RouteData2PublishInfo(topic string, data *topicRouteData)
*TopicPublishInfo {
+ publishInfo := &TopicPublishInfo{
+ RouteData: data,
+ OrderTopic: false,
+ }
+
+ if data.OrderTopicConf != "" {
+ brokers := strings.Split(data.OrderTopicConf, ";")
+ for _, broker := range brokers {
+ item := strings.Split(broker, ":")
+ nums, _ := strconv.Atoi(item[1])
+ for i := 0; i < nums; i++ {
+ mq := &MessageQueue{
+ Topic: topic,
+ BrokerName: item[0],
+ QueueId: i,
+ }
+ publishInfo.MqList = append(publishInfo.MqList,
mq)
+ }
+ }
+
+ publishInfo.OrderTopic = true
+ return publishInfo
+ }
+
+ qds := data.queueDataList
+ sort.Slice(qds, func(i, j int) bool {
+ return i-j >= 0
+ })
+
+ for _, qd := range qds {
+ if !isWriteable(qd.perm) {
+ continue
+ }
+
+ var bData *BrokerData
+ for _, bd := range data.brokerDataList {
+ if bd.brokerName == qd.brokerName {
+ bData = bd
+ break
+ }
+ }
+
+ if bData == nil || bData.brokerAddresses[masterId] == "" {
+ continue
+ }
+
+ for i := 0; i < qd.writeQueueNums; i++ {
+ mq := &MessageQueue{
+ Topic: topic,
+ BrokerName: qd.brokerName,
+ QueueId: i,
+ }
+ publishInfo.MqList = append(publishInfo.MqList, mq)
+ }
+ }
+
+ return publishInfo
+}
+
+func getNameServerAddress() string {
+ return ""
+}
+
+// topicRouteData topicRouteData
+type topicRouteData struct {
+ OrderTopicConf string
+ queueDataList []*QueueData
+ brokerDataList []*BrokerData
+}
+
+func (RouteData *topicRouteData) clone() *topicRouteData {
+ cloned := &topicRouteData{
+ OrderTopicConf: RouteData.OrderTopicConf,
+ queueDataList: make([]*QueueData,
len(RouteData.queueDataList)),
+ brokerDataList: make([]*BrokerData,
len(RouteData.brokerDataList)),
+ }
+
+ for index, value := range RouteData.queueDataList {
+ cloned.queueDataList[index] = value
+ }
+
+ for index, value := range RouteData.brokerDataList {
+ cloned.brokerDataList[index] = value
+ }
+
+ return cloned
+}
+
+func (RouteData *topicRouteData) equals(data *topicRouteData) bool {
+ return false
+}
+
+// QueueData QueueData
+type QueueData struct {
+ brokerName string
+ readQueueNums int
+ writeQueueNums int
+ perm int
+ topicSynFlag int
+}
+
+// BrokerData BrokerData
+type BrokerData struct {
+ brokerName string
+ brokerAddresses map[int64]string
+ brokerAddressesLock sync.RWMutex
}
diff --git a/common/route.go b/common/transaction.go
similarity index 95%
copy from common/route.go
copy to common/transaction.go
index 8b79e12..e526af5 100644
--- a/common/route.go
+++ b/common/transaction.go
@@ -17,5 +17,5 @@ limitations under the License.
package common
-type RouteInfo struct {
+type TransactionListener interface {
}
diff --git a/docs/zh/native-design_zh.md b/docs/zh/native-design_zh.md
index e5c96ee..dd407a4 100644
--- a/docs/zh/native-design_zh.md
+++ b/docs/zh/native-design_zh.md
@@ -43,11 +43,11 @@
NewRemotingCommand(code int16, header CustomHeader) *RemotingCommand
// send a request to servers and return until response received.
-InvokeSync(addr string, request *RemotingCommand, timeout time.Duration)
(*RemotingCommand, error)
+SendMessageSync(ctx context.Context, brokerAddrs, brokerName string, request
*SendMessageRequest, msgs []*Message) (*SendResult, error)
-InvokeAsync(addr string, request *RemotingCommand, timeout time.Duration, f
func(*RemotingCommand)) error
+SendMessageAsync(ctx context.Context, brokerAddrs, brokerName string, request
*SendMessageRequest, msgs []*Message, f func(result *SendResult)) error
-InvokeOneWay(addr string, request *RemotingCommand, timeout time.Duration)
error
+SendMessageOneWay(ctx context.Context, brokerAddrs string, request
*SendMessageRequest, msgs []*Message) (*SendResult, error)
```
### common
@@ -61,10 +61,10 @@ SendMessage(topic string, msgs *[]Message) error
SendMessageAsync(topic string, msgs *[]Message, f func(result *SendResult))
error
// PullMessage with sync
-PullMessage(request *PullMessageRequest) error
+PullMessage(ctx context.Context, brokerAddrs string, request
*PullMessageRequest) (*PullResult, error)
// PullMessageAsync pull message async
-PullMessageAsync(request *PullMessageRequest, f func(result *PullResult))
error
+func PullMessageAsync(ctx context.Context, brokerAddrs string, request
*PullMessageRequest, f func(result *PullResult)) error
// QueryMaxOffset with specific queueId and topic
QueryMaxOffset(topic string, queueId int) error
diff --git a/go.mod b/go.mod
new file mode 100644
index 0000000..809e20a
--- /dev/null
+++ b/go.mod
@@ -0,0 +1,12 @@
+module github.com/apache/rocketmq-client-go
+
+go 1.12
+
+require (
+ github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc //
indirect
+ github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf //
indirect
+ github.com/pkg/errors v0.8.1
+ github.com/sirupsen/logrus v1.3.0
+ github.com/stretchr/testify v1.3.0
+ gopkg.in/alecthomas/kingpin.v2 v2.2.6
+)
diff --git a/go.sum b/go.sum
new file mode 100644
index 0000000..62f4cad
--- /dev/null
+++ b/go.sum
@@ -0,0 +1,26 @@
+github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc
h1:cAKDfWh5VpdgMhJosfJnn5/FoN2SRZ4p7fJNX58YPaU=
+github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod
h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
+github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf
h1:qet1QNfXsQxTZqLG4oE62mJzwPIB8+Tee4RNCL9ulrY=
+github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod
h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
+github.com/davecgh/go-spew v1.1.0/go.mod
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/davecgh/go-spew v1.1.1
h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
+github.com/davecgh/go-spew v1.1.1/go.mod
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/konsorten/go-windows-terminal-sequences v1.0.1
h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk=
+github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod
h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
+github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
+github.com/pkg/errors v0.8.1/go.mod
h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
+github.com/pmezard/go-difflib v1.0.0
h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
+github.com/pmezard/go-difflib v1.0.0/go.mod
h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
+github.com/sirupsen/logrus v1.3.0
h1:hI/7Q+DtNZ2kINb6qt/lS+IyXnHQe9e90POfeewL/ME=
+github.com/sirupsen/logrus v1.3.0/go.mod
h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
+github.com/stretchr/objx v0.1.0/go.mod
h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+github.com/stretchr/objx v0.1.1/go.mod
h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+github.com/stretchr/testify v1.2.2/go.mod
h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
+github.com/stretchr/testify v1.3.0
h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
+github.com/stretchr/testify v1.3.0/go.mod
h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
+golang.org/x/crypto v0.0.0-20180904163835-0709b304e793
h1:u+LnwYTOOW7Ukr/fppxEb1Nwz0AtPflrblfvUudpo+I=
+golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod
h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
+golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33
h1:I6FyU15t786LL7oL/hn43zqTuEGr4PN7F4XJ1p4E3Y8=
+golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod
h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+gopkg.in/alecthomas/kingpin.v2 v2.2.6
h1:jMFz6MfLP0/4fUyZle81rXUoxOBFi19VUFKVDOQfozc=
+gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod
h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
diff --git a/remote/codec.go b/remote/codec.go
index ca50fa6..d556cfb 100644
--- a/remote/codec.go
+++ b/remote/codec.go
@@ -33,7 +33,7 @@ const (
// 1, RPC
RPCOneWay = 1
- //ResponseType for reponse
+ //ResponseType for response
ResponseType = 1
_Flag = 0
@@ -53,15 +53,24 @@ type RemotingCommand struct {
Body []byte `json:"-"`
}
-func NewRemotingCommand(code int16, properties map[string]string, body []byte)
*RemotingCommand {
- return &RemotingCommand{
- Code: code,
- Language: _LanguageCode,
- Version: _Version,
- Opaque: atomic.AddInt32(&opaque, 1),
- ExtFields: properties,
- Body: body,
+type CustomHeader interface {
+ Encode() map[string]string
+}
+
+func NewRemotingCommand(code int16, header CustomHeader, body []byte)
*RemotingCommand {
+ cmd := &RemotingCommand{
+ Code: code,
+ Language: _LanguageCode,
+ Version: _Version,
+ Opaque: atomic.AddInt32(&opaque, 1),
+ Body: body,
}
+
+ if header != nil {
+ cmd.ExtFields = header.Encode()
+ }
+
+ return cmd
}
func (command *RemotingCommand) isResponseType() bool {