This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/native by this push:
     new ddaab38  Partial implementation of Common Module (#36)
ddaab38 is described below

commit ddaab38e9b3855abb3b6745e3b12f01454b59cef
Author: wenfeng <[email protected]>
AuthorDate: Tue Mar 5 13:54:58 2019 +0800

    Partial implementation of Common Module (#36)
    
    * Adding implementation of updateRouteInfo
    
    * Modifying APIs of native and adding some implementations
    
    * Adding more APIs for producer/consumer in common
    
    * go mod init
---
 common/manager.go                   | 176 ++++++++++++++++++---
 common/message.go                   |   2 +-
 common/perm.go                      |  58 +++++++
 common/request.go                   |  25 +++
 common/response.go                  |  12 ++
 common/route.go                     | 302 +++++++++++++++++++++++++++++++++++-
 common/{route.go => transaction.go} |   2 +-
 docs/zh/native-design_zh.md         |  10 +-
 go.mod                              |  12 ++
 go.sum                              |  26 ++++
 remote/codec.go                     |  27 ++--
 11 files changed, 616 insertions(+), 36 deletions(-)

diff --git a/common/manager.go b/common/manager.go
index 21f0823..63c1896 100644
--- a/common/manager.go
+++ b/common/manager.go
@@ -18,29 +18,118 @@ limitations under the License.
 package common
 
 import (
+       "context"
        "fmt"
-       "sync"
+       "github.com/apache/rocketmq-client-go/remote"
+       "time"
 )
 
-var connMap sync.Map
+const (
+       defaultTraceRegionID = "DefaultRegion"
+       tranceOff            = "false"
+)
+
+type InnerProducer interface {
+       PublishTopicList() []string
+       UpdateTopicPublishInfo(topic string, info *TopicPublishInfo)
+       IsPublishTopicNeedUpdate(topic string) bool
+       GetCheckListener() func(msg *MessageExt)
+       GetTransactionListener() TransactionListener
+       //CheckTransactionState()
+       isUnitMode() bool
+}
+
+type InnerConsumer interface {
+       DoRebalance()
+       PersistConsumerOffset()
+       UpdateTopicSubscribeInfo(topic string, mqs []*MessageQueue)
+       IsSubscribeTopicNeedUpdate(topic string) bool
+       IsUnitMode() bool
+}
 
 // SendMessage with batch by sync
-func SendMessage(topic string, msgs *[]Message) error {
-       return nil
+func SendMessageSync(ctx context.Context, brokerAddrs, brokerName string, 
request *SendMessageRequest,
+       msgs []*Message) (*SendResult, error) {
+       cmd := remote.NewRemotingCommand(SendBatchMessage, request, 
encodeMessages(msgs))
+       response, err := remote.InvokeSync(brokerAddrs, cmd, 3*time.Second)
+       if err != nil {
+               return nil, err
+       }
+
+       return processSendResponse(brokerName, msgs, response), nil
 }
 
 // SendMessageAsync send message with batch by async
-func SendMessageAsync(topic string, msgs *[]Message, f func(result 
*SendResult)) error {
+func SendMessageAsync(ctx context.Context, brokerAddrs, brokerName string, 
request *SendMessageRequest,
+       msgs []*Message, f func(result *SendResult)) error {
        return nil
 }
 
-// PullMessage with sync
-func PullMessage(request *PullMessageRequest) error {
+func SendMessageOneWay(ctx context.Context, brokerAddrs string, request 
*SendMessageRequest,
+       msgs []*Message) (*SendResult, error) {
+       cmd := remote.NewRemotingCommand(SendBatchMessage, request, 
encodeMessages(msgs))
+       err := remote.InvokeOneWay(brokerAddrs, cmd)
+       return nil, err
+}
+
+func encodeMessages(message []*Message) []byte {
        return nil
 }
 
+func processSendResponse(brokerName string, msgs []*Message, cmd 
*remote.RemotingCommand) *SendResult {
+       var status SendStatus
+       switch cmd.Code {
+       case FlushDiskTimeout:
+               status = SendFlushDiskTimeout
+       case FlushSlaveTimeout:
+               status = SendFlushSlaveTimeout
+       case SlaveNotAvailable:
+               status = SendSlaveNotAvailable
+       case Success:
+               status = SendOK
+       default:
+               // TODO process unknown code
+       }
+
+       sendResponse := &SendMessageResponse{}
+       sendResponse.Decode(cmd.ExtFields)
+
+       msgIDs := make([]string, 0)
+       for i := 0; i < len(msgs); i++ {
+               msgIDs = append(msgIDs, 
msgs[i].Properties[UniqueClientMessageIdKeyindex])
+
+       }
+
+       regionId := cmd.ExtFields[MsgRegion]
+       trace := cmd.ExtFields[TraceSwitch]
+
+       if regionId == "" {
+               regionId = defaultTraceRegionID
+       }
+
+       return &SendResult{
+               Status:      status,
+               MsgIDs:      msgIDs,
+               OffsetMsgID: sendResponse.MsgId,
+               MessageQueue: &MessageQueue{
+                       Topic:      msgs[0].Topic,
+                       BrokerName: brokerName,
+                       QueueId:    int(sendResponse.QueueId),
+               },
+               QueueOffset:   sendResponse.QueueOffset,
+               TransactionID: sendResponse.TransactionId,
+               RegionID:      regionId,
+               TraceOn:       trace != "" && trace != tranceOff,
+       }
+}
+
+// PullMessage with sync
+func PullMessage(ctx context.Context, brokerAddrs string, request 
*PullMessageRequest) (*PullResult, error) {
+       return nil, nil
+}
+
 // PullMessageAsync pull message async
-func PullMessageAsync(request *PullMessageRequest, f func(result *PullResult)) 
error {
+func PullMessageAsync(ctx context.Context, brokerAddrs string, request 
*PullMessageRequest, f func(result *PullResult)) error {
        return nil
 }
 
@@ -76,20 +165,20 @@ const (
 
 // SendResult rocketmq send result
 type SendResult struct {
-       sendStatus    SendStatus
-       msgID         string
-       messageQueue  MessageQueue
-       queueOffset   int64
-       transactionID string
-       offsetMsgID   string
-       regionID      string
-       traceOn       bool
+       Status        SendStatus
+       MsgIDs        []string
+       MessageQueue  *MessageQueue
+       QueueOffset   int64
+       TransactionID string
+       OffsetMsgID   string
+       RegionID      string
+       TraceOn       bool
 }
 
 // SendResult send message result to string(detail result)
 func (result *SendResult) String() string {
-       return fmt.Sprintf("SendResult [sendStatus=%d, msgId=%s, 
offsetMsgId=%s, queueOffset=%d, messageQueue=%s]",
-               result.sendStatus, result.msgID, result.offsetMsgID, 
result.queueOffset, result.messageQueue.String())
+       return fmt.Sprintf("SendResult [sendStatus=%d, msgIds=%s, 
offsetMsgId=%s, queueOffset=%d, messageQueue=%s]",
+               result.Status, result.MsgIDs, result.OffsetMsgID, 
result.QueueOffset, result.MessageQueue.String())
 }
 
 // PullResult the pull result
@@ -117,9 +206,58 @@ const (
 type MessageQueue struct {
        Topic      string `json:"topic"`
        BrokerName string `json:"brokerName"`
-       QueueId    int32  `json:"queueId"`
+       QueueId    int    `json:"queueId"`
 }
 
 func (mq *MessageQueue) String() string {
        return fmt.Sprintf("MessageQueue [topic=%s, brokerName=%s, 
queueId=%d]", mq.Topic, mq.BrokerName, mq.QueueId)
 }
+
+func CheckClientInBroker() {
+
+}
+
+func SendHeartbeatToAllBrokerWithLock() {
+
+}
+
+func UpdateTopicRouteInfoFromNameServer(topic string) {
+}
+
+func RegisterConsumer(group string, consumer InnerConsumer) {
+
+}
+
+func UnregisterConsumer(group string) {
+
+}
+
+func RegisterProducer(group string, producer InnerProducer) {
+
+}
+
+func UnregisterProducer(group string) {
+
+}
+
+func SelectProducer(group string) InnerProducer {
+       return nil
+}
+
+func SelectConsumer(group string) InnerConsumer {
+       return nil
+}
+
+func FindBrokerAddressInPublish(brokerName string) {
+
+}
+
+func FindBrokerAddressInSubscribe(brokerName string, brokerId int64, 
onlyThisBroker bool) *FindBrokerResult {
+       return nil
+}
+
+type FindBrokerResult struct {
+       brokerAddr    string
+       slave         bool
+       brokerVersion int32
+}
diff --git a/common/message.go b/common/message.go
index e7b64ae..08ead01 100644
--- a/common/message.go
+++ b/common/message.go
@@ -40,7 +40,7 @@ const (
        ReconsumeTime                  = "RECONSUME_TIME"
        MsgRegion                      = "MSG_REGION"
        TraceSwitch                    = "TRACE_ON"
-       UniqeClientMessageIdKeyindex   = "UNIQ_KEY"
+       UniqueClientMessageIdKeyindex  = "UNIQ_KEY"
        MaxReconsumeTimes              = "MAX_RECONSUME_TIMES"
        ConsumeStartTime               = "CONSUME_START_TIME"
        TranscationPreparedQueueOffset = "TRAN_PREPARED_QUEUE_OFFSET"
diff --git a/common/perm.go b/common/perm.go
new file mode 100644
index 0000000..13f1e7e
--- /dev/null
+++ b/common/perm.go
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package common
+
+const (
+       permPriority = 0x1 << 3
+       permRead     = 0x1 << 2
+       permWrite    = 0x1 << 1
+       permInherit  = 0x1 << 0
+)
+
+func isReadable(perm int) bool {
+       return (perm & permRead) == permRead
+}
+
+func isWriteable(perm int) bool {
+       return (perm & permWrite) == permWrite
+}
+
+func isInherited(perm int) bool {
+       return (perm & permInherit) == permInherit
+}
+
+func perm2string(perm int) string {
+       bytes := make([]byte, 3)
+       for i := 0; i < 3; i++ {
+               bytes[i] = '-'
+       }
+
+       if isReadable(perm) {
+               bytes[0] = 'R'
+       }
+
+       if isWriteable(perm) {
+               bytes[1] = 'W'
+       }
+
+       if isInherited(perm) {
+               bytes[2] = 'X'
+       }
+
+       return string(bytes)
+}
diff --git a/common/request.go b/common/request.go
index 48be702..83d74a0 100644
--- a/common/request.go
+++ b/common/request.go
@@ -17,6 +17,11 @@ limitations under the License.
 
 package common
 
+const (
+       GetRouteInfoByTopic = int16(105)
+       SendBatchMessage    = int16(320)
+)
+
 type SendMessageRequest struct {
        ProducerGroup         string `json:"producerGroup"`
        Topic                 string `json:"topic"`
@@ -32,6 +37,14 @@ type SendMessageRequest struct {
        MaxReconsumeTimes     int    `json:"maxReconsumeTimes"`
 }
 
+func (request *SendMessageRequest) Encode() map[string]string {
+       return nil
+}
+
+func (request *SendMessageRequest) Decode(properties map[string]string) error {
+       return nil
+}
+
 type PullMessageRequest struct {
        ConsumerGroup        string `json:"consumerGroup"`
        Topic                string `json:"topic"`
@@ -68,3 +81,15 @@ type UpdateConsumerOffsetRequest struct {
        QueueId       int32  `json:"queueId"`
        CommitOffset  int64  `json:"commitOffset"`
 }
+
+type GetRouteInfoRequest struct {
+       Topic string `json:"topic"`
+}
+
+func (request *GetRouteInfoRequest) Encode() map[string]string {
+       return nil
+}
+
+func (request *GetRouteInfoRequest) Decode(properties map[string]string) error 
{
+       return nil
+}
diff --git a/common/response.go b/common/response.go
index d4d96e5..0fc0101 100644
--- a/common/response.go
+++ b/common/response.go
@@ -17,6 +17,14 @@ limitations under the License.
 
 package common
 
+const (
+       Success           = int16(0)
+       FlushDiskTimeout  = int16(10)
+       SlaveNotAvailable = int16(11)
+       FlushSlaveTimeout = int16(12)
+       TopicNotExist     = int16(17)
+)
+
 type SendMessageResponse struct {
        MsgId         string
        QueueId       int32
@@ -25,6 +33,10 @@ type SendMessageResponse struct {
        MsgRegion     string
 }
 
+func (response *SendMessageResponse) Decode(properties map[string]string) {
+
+}
+
 type PullMessageResponse struct {
        SuggestWhichBrokerId int64
        NextBeginOffset      int64
diff --git a/common/route.go b/common/route.go
index 8b79e12..33f25e1 100644
--- a/common/route.go
+++ b/common/route.go
@@ -17,5 +17,305 @@ limitations under the License.
 
 package common
 
-type RouteInfo struct {
+import (
+       "encoding/json"
+       "github.com/apache/rocketmq-client-go/remote"
+       "github.com/pkg/errors"
+       log "github.com/sirupsen/logrus"
+       "sort"
+       "strconv"
+       "strings"
+       "sync"
+       "sync/atomic"
+       "time"
+)
+
+const (
+       requestTimeout   = 3000
+       defaultTopic     = "TBW102"
+       defaultQueueNums = 4
+       masterId         = int64(0)
+)
+
+var (
+       ErrTopicNotExist = errors.New("topic not exist")
+)
+
+var (
+       brokerAddressesMap sync.Map
+       publishInfoMap     sync.Map
+       routeDataMap       sync.Map
+       lockNamesrv        sync.Mutex
+)
+
+// key is topic, value is TopicPublishInfo
+type TopicPublishInfo struct {
+       OrderTopic          bool
+       HaveTopicRouterInfo bool
+       MqList              []*MessageQueue
+       RouteData           *topicRouteData
+       TopicQueueIndex     int32
+}
+
+func (info *TopicPublishInfo) isOK() (bIsTopicOk bool) {
+       return len(info.MqList) > 0
+}
+
+func (info *TopicPublishInfo) fetchQueueIndex() int {
+       length := len(info.MqList)
+       if length <= 0 {
+               return -1
+       }
+       qIndex := atomic.AddInt32(&info.TopicQueueIndex, 1)
+       return int(qIndex) % length
+}
+
+func tryToFindTopicPublishInfo(topic string) *TopicPublishInfo {
+       value, exist := publishInfoMap.Load(topic)
+
+       var info *TopicPublishInfo
+       if exist {
+               info = value.(*TopicPublishInfo)
+       }
+
+       if info == nil || !info.isOK() {
+               updateTopicRouteInfo(topic)
+               value, exist = publishInfoMap.Load(topic)
+               if !exist {
+                       info = &TopicPublishInfo{HaveTopicRouterInfo: false}
+               } else {
+                       info = value.(*TopicPublishInfo)
+               }
+       }
+
+       if info.HaveTopicRouterInfo || info.isOK() {
+               return info
+       }
+
+       value, exist = publishInfoMap.Load(topic)
+       if exist {
+               return value.(*TopicPublishInfo)
+       }
+
+       return nil
+}
+
+func updateTopicRouteInfo(topic string) {
+       // Todo process lock timeout
+       lockNamesrv.Lock()
+       defer lockNamesrv.Unlock()
+
+       RouteData, err := queryTopicRouteInfoFromServer(topic, requestTimeout)
+       if err != nil {
+               log.Warningf("query topic route from server error: %s", err)
+               return
+       }
+
+       if RouteData == nil {
+               log.Warningf("queryTopicRouteInfoFromServer return nil, Topic: 
%s", topic)
+               return
+       }
+
+       var changed bool
+       oldRouteData, exist := routeDataMap.Load(topic)
+       if !exist || RouteData == nil {
+               changed = true
+       } else {
+               changed = 
topicRouteDataIsChange(oldRouteData.(*topicRouteData), RouteData)
+       }
+
+       if !changed {
+               changed = isNeedUpdateTopicRouteInfo(topic)
+       } else {
+               log.Infof("the topic[%s] route info changed, old[%s] ,new[%s]", 
topic, oldRouteData, RouteData)
+       }
+
+       if !changed {
+               return
+       }
+
+       newTopicRouteData := RouteData.clone()
+
+       for _, brokerData := range newTopicRouteData.brokerDataList {
+               brokerAddressesMap.Store(brokerData.brokerName, 
brokerData.brokerAddresses)
+       }
+
+       // update publish info
+       publishInfo := RouteData2PublishInfo(topic, RouteData)
+       publishInfo.HaveTopicRouterInfo = true
+
+       old, _ := publishInfoMap.Load(topic)
+       publishInfoMap.Store(topic, publishInfoMap)
+       if old != nil {
+               log.Infof("Old TopicPublishInfo [%s] removed.", old)
+       }
+}
+
+func queryTopicRouteInfoFromServer(topic string, timeout time.Duration) 
(*topicRouteData, error) {
+       request := &GetRouteInfoRequest{
+               Topic: topic,
+       }
+       rc := remote.NewRemotingCommand(GetRouteInfoByTopic, request, nil)
+
+       response, err := remote.InvokeSync(getNameServerAddress(), rc, timeout)
+
+       if err != nil {
+               return nil, err
+       }
+
+       switch response.Code {
+       case Success:
+               if response.Body == nil {
+                       return nil, errors.New(response.Remark)
+               }
+               RouteData := &topicRouteData{}
+               err = json.Unmarshal(response.Body, RouteData)
+               if err != nil {
+                       log.Warningf("unmarshal topicRouteData error: %s", err)
+                       return nil, err
+               }
+               return RouteData, nil
+       case TopicNotExist:
+               return nil, ErrTopicNotExist
+       default:
+               return nil, errors.New(response.Remark)
+       }
+}
+
+func topicRouteDataIsChange(oldData *topicRouteData, newData *topicRouteData) 
bool {
+       if oldData == nil || newData == nil {
+               return true
+       }
+       oldDataCloned := oldData.clone()
+       newDataCloned := newData.clone()
+
+       sort.Slice(oldDataCloned.queueDataList, func(i, j int) bool {
+               return 
strings.Compare(oldDataCloned.queueDataList[i].brokerName, 
oldDataCloned.queueDataList[j].brokerName) > 0
+       })
+       sort.Slice(oldDataCloned.brokerDataList, func(i, j int) bool {
+               return 
strings.Compare(oldDataCloned.brokerDataList[i].brokerName, 
oldDataCloned.brokerDataList[j].brokerName) > 0
+       })
+       sort.Slice(newDataCloned.queueDataList, func(i, j int) bool {
+               return 
strings.Compare(newDataCloned.queueDataList[i].brokerName, 
newDataCloned.queueDataList[j].brokerName) > 0
+       })
+       sort.Slice(newDataCloned.brokerDataList, func(i, j int) bool {
+               return 
strings.Compare(newDataCloned.brokerDataList[i].brokerName, 
newDataCloned.brokerDataList[j].brokerName) > 0
+       })
+
+       return !oldDataCloned.equals(newDataCloned)
+}
+
+func isNeedUpdateTopicRouteInfo(topic string) bool {
+       value, exist := publishInfoMap.Load(topic)
+
+       return !exist || value.(*TopicPublishInfo).isOK()
+}
+
+func RouteData2PublishInfo(topic string, data *topicRouteData) 
*TopicPublishInfo {
+       publishInfo := &TopicPublishInfo{
+               RouteData:  data,
+               OrderTopic: false,
+       }
+
+       if data.OrderTopicConf != "" {
+               brokers := strings.Split(data.OrderTopicConf, ";")
+               for _, broker := range brokers {
+                       item := strings.Split(broker, ":")
+                       nums, _ := strconv.Atoi(item[1])
+                       for i := 0; i < nums; i++ {
+                               mq := &MessageQueue{
+                                       Topic:      topic,
+                                       BrokerName: item[0],
+                                       QueueId:    i,
+                               }
+                               publishInfo.MqList = append(publishInfo.MqList, 
mq)
+                       }
+               }
+
+               publishInfo.OrderTopic = true
+               return publishInfo
+       }
+
+       qds := data.queueDataList
+       sort.Slice(qds, func(i, j int) bool {
+               return i-j >= 0
+       })
+
+       for _, qd := range qds {
+               if !isWriteable(qd.perm) {
+                       continue
+               }
+
+               var bData *BrokerData
+               for _, bd := range data.brokerDataList {
+                       if bd.brokerName == qd.brokerName {
+                               bData = bd
+                               break
+                       }
+               }
+
+               if bData == nil || bData.brokerAddresses[masterId] == "" {
+                       continue
+               }
+
+               for i := 0; i < qd.writeQueueNums; i++ {
+                       mq := &MessageQueue{
+                               Topic:      topic,
+                               BrokerName: qd.brokerName,
+                               QueueId:    i,
+                       }
+                       publishInfo.MqList = append(publishInfo.MqList, mq)
+               }
+       }
+
+       return publishInfo
+}
+
+func getNameServerAddress() string {
+       return ""
+}
+
+// topicRouteData topicRouteData
+type topicRouteData struct {
+       OrderTopicConf string
+       queueDataList  []*QueueData
+       brokerDataList []*BrokerData
+}
+
+func (RouteData *topicRouteData) clone() *topicRouteData {
+       cloned := &topicRouteData{
+               OrderTopicConf: RouteData.OrderTopicConf,
+               queueDataList:  make([]*QueueData, 
len(RouteData.queueDataList)),
+               brokerDataList: make([]*BrokerData, 
len(RouteData.brokerDataList)),
+       }
+
+       for index, value := range RouteData.queueDataList {
+               cloned.queueDataList[index] = value
+       }
+
+       for index, value := range RouteData.brokerDataList {
+               cloned.brokerDataList[index] = value
+       }
+
+       return cloned
+}
+
+func (RouteData *topicRouteData) equals(data *topicRouteData) bool {
+       return false
+}
+
+// QueueData QueueData
+type QueueData struct {
+       brokerName     string
+       readQueueNums  int
+       writeQueueNums int
+       perm           int
+       topicSynFlag   int
+}
+
+// BrokerData BrokerData
+type BrokerData struct {
+       brokerName          string
+       brokerAddresses     map[int64]string
+       brokerAddressesLock sync.RWMutex
 }
diff --git a/common/route.go b/common/transaction.go
similarity index 95%
copy from common/route.go
copy to common/transaction.go
index 8b79e12..e526af5 100644
--- a/common/route.go
+++ b/common/transaction.go
@@ -17,5 +17,5 @@ limitations under the License.
 
 package common
 
-type RouteInfo struct {
+type TransactionListener interface {
 }
diff --git a/docs/zh/native-design_zh.md b/docs/zh/native-design_zh.md
index e5c96ee..dd407a4 100644
--- a/docs/zh/native-design_zh.md
+++ b/docs/zh/native-design_zh.md
@@ -43,11 +43,11 @@
 NewRemotingCommand(code int16, header CustomHeader) *RemotingCommand 
 
 // send a request to servers and return until response received.
-InvokeSync(addr string, request *RemotingCommand, timeout time.Duration) 
(*RemotingCommand, error) 
+SendMessageSync(ctx context.Context, brokerAddrs, brokerName string, request 
*SendMessageRequest, msgs []*Message) (*SendResult, error)
 
-InvokeAsync(addr string, request *RemotingCommand, timeout time.Duration, f 
func(*RemotingCommand)) error 
+SendMessageAsync(ctx context.Context, brokerAddrs, brokerName string, request 
*SendMessageRequest, msgs []*Message, f func(result *SendResult)) error
 
-InvokeOneWay(addr string, request *RemotingCommand, timeout time.Duration) 
error 
+SendMessageOneWay(ctx context.Context, brokerAddrs string, request 
*SendMessageRequest, msgs []*Message) (*SendResult, error)
 ```
 
 ### common
@@ -61,10 +61,10 @@ SendMessage(topic string, msgs *[]Message) error
 SendMessageAsync(topic string, msgs *[]Message, f func(result *SendResult)) 
error 
 
 // PullMessage with sync
-PullMessage(request *PullMessageRequest) error 
+PullMessage(ctx context.Context, brokerAddrs string, request 
*PullMessageRequest) (*PullResult, error)
 
 // PullMessageAsync pull message async
-PullMessageAsync(request *PullMessageRequest, f func(result *PullResult)) 
error 
+func PullMessageAsync(ctx context.Context, brokerAddrs string, request 
*PullMessageRequest, f func(result *PullResult)) error
 
 // QueryMaxOffset with specific queueId and topic
 QueryMaxOffset(topic string, queueId int) error 
diff --git a/go.mod b/go.mod
new file mode 100644
index 0000000..809e20a
--- /dev/null
+++ b/go.mod
@@ -0,0 +1,12 @@
+module github.com/apache/rocketmq-client-go
+
+go 1.12
+
+require (
+       github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc // 
indirect
+       github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf // 
indirect
+       github.com/pkg/errors v0.8.1
+       github.com/sirupsen/logrus v1.3.0
+       github.com/stretchr/testify v1.3.0
+       gopkg.in/alecthomas/kingpin.v2 v2.2.6
+)
diff --git a/go.sum b/go.sum
new file mode 100644
index 0000000..62f4cad
--- /dev/null
+++ b/go.sum
@@ -0,0 +1,26 @@
+github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc 
h1:cAKDfWh5VpdgMhJosfJnn5/FoN2SRZ4p7fJNX58YPaU=
+github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod 
h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
+github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf 
h1:qet1QNfXsQxTZqLG4oE62mJzwPIB8+Tee4RNCL9ulrY=
+github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod 
h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
+github.com/davecgh/go-spew v1.1.0/go.mod 
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/davecgh/go-spew v1.1.1 
h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
+github.com/davecgh/go-spew v1.1.1/go.mod 
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/konsorten/go-windows-terminal-sequences v1.0.1 
h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk=
+github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod 
h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
+github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
+github.com/pkg/errors v0.8.1/go.mod 
h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
+github.com/pmezard/go-difflib v1.0.0 
h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
+github.com/pmezard/go-difflib v1.0.0/go.mod 
h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
+github.com/sirupsen/logrus v1.3.0 
h1:hI/7Q+DtNZ2kINb6qt/lS+IyXnHQe9e90POfeewL/ME=
+github.com/sirupsen/logrus v1.3.0/go.mod 
h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
+github.com/stretchr/objx v0.1.0/go.mod 
h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+github.com/stretchr/objx v0.1.1/go.mod 
h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+github.com/stretchr/testify v1.2.2/go.mod 
h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
+github.com/stretchr/testify v1.3.0 
h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
+github.com/stretchr/testify v1.3.0/go.mod 
h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
+golang.org/x/crypto v0.0.0-20180904163835-0709b304e793 
h1:u+LnwYTOOW7Ukr/fppxEb1Nwz0AtPflrblfvUudpo+I=
+golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod 
h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
+golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33 
h1:I6FyU15t786LL7oL/hn43zqTuEGr4PN7F4XJ1p4E3Y8=
+golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod 
h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+gopkg.in/alecthomas/kingpin.v2 v2.2.6 
h1:jMFz6MfLP0/4fUyZle81rXUoxOBFi19VUFKVDOQfozc=
+gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod 
h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
diff --git a/remote/codec.go b/remote/codec.go
index ca50fa6..d556cfb 100644
--- a/remote/codec.go
+++ b/remote/codec.go
@@ -33,7 +33,7 @@ const (
        // 1, RPC
        RPCOneWay = 1
 
-       //ResponseType for reponse
+       //ResponseType for response
        ResponseType = 1
 
        _Flag         = 0
@@ -53,15 +53,24 @@ type RemotingCommand struct {
        Body      []byte            `json:"-"`
 }
 
-func NewRemotingCommand(code int16, properties map[string]string, body []byte) 
*RemotingCommand {
-       return &RemotingCommand{
-               Code:      code,
-               Language:  _LanguageCode,
-               Version:   _Version,
-               Opaque:    atomic.AddInt32(&opaque, 1),
-               ExtFields: properties,
-               Body:      body,
+type CustomHeader interface {
+       Encode() map[string]string
+}
+
+func NewRemotingCommand(code int16, header CustomHeader, body []byte) 
*RemotingCommand {
+       cmd := &RemotingCommand{
+               Code:     code,
+               Language: _LanguageCode,
+               Version:  _Version,
+               Opaque:   atomic.AddInt32(&opaque, 1),
+               Body:     body,
        }
+
+       if header != nil {
+               cmd.ExtFields = header.Encode()
+       }
+
+       return cmd
 }
 
 func (command *RemotingCommand) isResponseType() bool {

Reply via email to