This is an automated email from the ASF dual-hosted git repository.

wenfeng pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/native by this push:
     new 757420d  [ISSUE #301] Adding GET_CONSUMER_RUNNING_INFO (#311)
757420d is described below

commit 757420dcb1e8441021bab6845c5c6f054a80aa77
Author: wenfeng <[email protected]>
AuthorDate: Thu Nov 28 16:03:22 2019 +0800

    [ISSUE #301] Adding GET_CONSUMER_RUNNING_INFO (#311)
    
    * Adding GET_CONSUMER_RUNNING_INFO impl
    
    * add Marshal ConsumerRunningInfo Test Case
---
 consumer/consumer.go      |  17 +--
 consumer/process_queue.go |  51 +++++++++
 consumer/push_consumer.go |  39 +++++++
 internal/client.go        |  49 ++++++---
 internal/mock_namesrv.go  |  30 ++++++
 internal/model.go         | 160 ++++++++++++++++++++++++++++
 internal/model_test.go    | 263 ++++++++++++++++++++++++++++++++++++++++++++--
 internal/namesrv.go       |   6 ++
 internal/request.go       |  43 +++++---
 9 files changed, 617 insertions(+), 41 deletions(-)

diff --git a/consumer/consumer.go b/consumer/consumer.go
index 13cef4e..070cd19 100644
--- a/consumer/consumer.go
+++ b/consumer/consumer.go
@@ -236,12 +236,13 @@ type defaultConsumer struct {
         *
         * See <a href="http://rocketmq.apache.org/docs/core-concept/";>here</a> 
for further discussion.
         */
-       consumerGroup  string
-       model          MessageModel
-       allocate       func(string, string, []*primitive.MessageQueue, 
[]string) []*primitive.MessageQueue
-       unitMode       bool
-       consumeOrderly bool
-       fromWhere      ConsumeFromWhere
+       consumerGroup          string
+       model                  MessageModel
+       allocate               func(string, string, []*primitive.MessageQueue, 
[]string) []*primitive.MessageQueue
+       unitMode               bool
+       consumeOrderly         bool
+       fromWhere              ConsumeFromWhere
+       consumerStartTimestamp int64
 
        cType     ConsumeType
        client    internal.RMQClient
@@ -250,7 +251,7 @@ type defaultConsumer struct {
        pause     bool
        once      sync.Once
        option    consumerOptions
-       // key: int, hash(*primitive.MessageQueue)
+       // key: primitive.MessageQueue
        // value: *processQueue
        processQueueTable sync.Map
 
@@ -287,7 +288,7 @@ func (dc *defaultConsumer) start() error {
        dc.client.UpdateTopicRouteInfo()
        dc.client.Start()
        dc.state = internal.StateRunning
-
+       dc.consumerStartTimestamp = time.Now().UnixNano() / 
int64(time.Millisecond)
        return nil
 }
 
diff --git a/consumer/process_queue.go b/consumer/process_queue.go
index 13b2d25..4f4dc0c 100644
--- a/consumer/process_queue.go
+++ b/consumer/process_queue.go
@@ -28,6 +28,7 @@ import (
        gods_util "github.com/emirpasic/gods/utils"
        uatomic "go.uber.org/atomic"
 
+       "github.com/apache/rocketmq-client-go/internal"
        "github.com/apache/rocketmq-client-go/primitive"
        "github.com/apache/rocketmq-client-go/rlog"
 )
@@ -277,6 +278,28 @@ func (pq *processQueue) Max() int64 {
        return -1
 }
 
+func (pq *processQueue) MinOrderlyCache() int64 {
+       if pq.consumingMsgOrderlyTreeMap.Empty() {
+               return -1
+       }
+       k, _ := pq.consumingMsgOrderlyTreeMap.Min()
+       if k != nil {
+               return k.(int64)
+       }
+       return -1
+}
+
+func (pq *processQueue) MaxOrderlyCache() int64 {
+       if pq.consumingMsgOrderlyTreeMap.Empty() {
+               return -1
+       }
+       k, _ := pq.consumingMsgOrderlyTreeMap.Max()
+       if k != nil {
+               return k.(int64)
+       }
+       return -1
+}
+
 func (pq *processQueue) clear() {
        pq.mutex.Lock()
        pq.msgCache.Clear()
@@ -302,3 +325,31 @@ func (pq *processQueue) commit() int64 {
        pq.consumingMsgOrderlyTreeMap.Clear()
        return offset + 1
 }
+
+func (pq *processQueue) currentInfo() internal.ProcessQueueInfo {
+       pq.mutex.RLock()
+       defer pq.mutex.RUnlock()
+       info := internal.ProcessQueueInfo{
+               Locked:               pq.locked.Load(),
+               TryUnlockTimes:       pq.tryUnlockTimes,
+               LastLockTimestamp:    pq.lastLockTime.UnixNano() / 10e6,
+               Dropped:              pq.dropped.Load(),
+               LastPullTimestamp:    pq.lastPullTime.UnixNano() / 10e6,
+               LastConsumeTimestamp: pq.lastConsumeTime.UnixNano() / 10e6,
+       }
+
+       if !pq.msgCache.Empty() {
+               info.CachedMsgMinOffset = pq.Min()
+               info.CachedMsgMaxOffset = pq.Max()
+               info.CachedMsgCount = pq.msgCache.Size()
+               info.CachedMsgSizeInMiB = pq.cachedMsgSize / int64(1024*1024)
+       }
+
+       if !pq.consumingMsgOrderlyTreeMap.Empty() {
+               info.TransactionMsgMinOffset = pq.MinOrderlyCache()
+               info.TransactionMsgMaxOffset = pq.MaxOrderlyCache()
+               info.TransactionMsgCount = pq.consumingMsgOrderlyTreeMap.Size()
+       }
+
+       return info
+}
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index 3c5b79a..98b0033 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -273,6 +273,45 @@ func (pc *pushConsumer) IsUnitMode() bool {
        return pc.unitMode
 }
 
+func (pc *pushConsumer) GetConsumerRunningInfo() *internal.ConsumerRunningInfo 
{
+       info := internal.NewConsumerRunningInfo()
+
+       pc.subscriptionDataTable.Range(func(key, value interface{}) bool {
+               topic := key.(string)
+               info.SubscriptionData[value.(*internal.SubscriptionData)] = true
+               status := internal.ConsumeStatus{
+                       PullRT:            getPullRT(topic, 
pc.consumerGroup).avgpt,
+                       PullTPS:           getPullTPS(topic, 
pc.consumerGroup).tps,
+                       ConsumeRT:         getConsumeRT(topic, 
pc.consumerGroup).avgpt,
+                       ConsumeOKTPS:      getConsumeOKTPS(topic, 
pc.consumerGroup).tps,
+                       ConsumeFailedTPS:  getConsumeFailedTPS(topic, 
pc.consumerGroup).tps,
+                       ConsumeFailedMsgs: 
topicAndGroupConsumeFailedTPS.getStatsDataInHour(topic + "@" + 
pc.consumerGroup).sum,
+               }
+               info.StatusTable[topic] = status
+               return true
+       })
+
+       pc.processQueueTable.Range(func(key, value interface{}) bool {
+               mq := key.(primitive.MessageQueue)
+               pq := value.(*processQueue)
+               pInfo := pq.currentInfo()
+               pInfo.CommitOffset = pc.storage.read(&mq, _ReadMemoryThenStore)
+               info.MQTable[mq] = pInfo
+               return true
+       })
+
+       nsAddr := ""
+       for _, value := range pc.namesrv.AddrList() {
+               nsAddr += fmt.Sprintf("%s;", value)
+       }
+       info.Properties[internal.PropNameServerAddr] = nsAddr
+       info.Properties[internal.PropConsumeType] = string(pc.cType)
+       info.Properties[internal.PropConsumeOrderly] = 
strconv.FormatBool(pc.consumeOrderly)
+       info.Properties[internal.PropThreadPoolCoreSize] = "-1"
+       info.Properties[internal.PropConsumerStartTimestamp] = 
strconv.FormatInt(pc.consumerStartTimestamp, 10)
+       return info
+}
+
 func (pc *pushConsumer) messageQueueChanged(topic string, mqAll, mqDivided 
[]*primitive.MessageQueue) {
        v, exit := pc.subscriptionDataTable.Load(topic)
        if !exit {
diff --git a/internal/client.go b/internal/client.go
index 3336944..d9c5f6a 100644
--- a/internal/client.go
+++ b/internal/client.go
@@ -18,7 +18,6 @@ limitations under the License.
 package internal
 
 import (
-       "bytes"
        "context"
        "errors"
        "fmt"
@@ -36,6 +35,7 @@ import (
 )
 
 const (
+       clientVersion        = "v2.0.0-alpha3"
        defaultTraceRegionID = "DefaultRegion"
 
        // tracing message switch
@@ -83,6 +83,7 @@ type InnerConsumer interface {
        SubscriptionDataList() []*SubscriptionData
        Rebalance()
        IsUnitMode() bool
+       GetConsumerRunningInfo() *ConsumerRunningInfo
 }
 
 func DefaultClientOptions() ClientOptions {
@@ -220,8 +221,30 @@ func GetOrNewRocketMQClient(option ClientOptions, 
callbackCh chan interface{}) R
 
                
client.remoteClient.RegisterRequestFunc(ReqGetConsumerRunningInfo, func(req 
*remote.RemotingCommand, addr net.Addr) *remote.RemotingCommand {
                        rlog.Info("receive get consumer running info 
request...", nil)
+                       header := new(GetConsumerRunningInfoHeader)
+                       header.Decode(req.ExtFields)
+                       val, exist := clientMap.Load(header.clientID)
                        res := remote.NewRemotingCommand(ResError, nil, nil)
-                       res.Remark = "the go client has not supported consumer 
running info"
+                       if !exist {
+                               res.Remark = fmt.Sprintf("Can't find specified 
client instance of: %s", header.clientID)
+                       } else {
+                               cli, ok := val.(*rmqClient)
+                               var runningInfo *ConsumerRunningInfo
+                               if ok {
+                                       runningInfo = 
cli.getConsumerRunningInfo(header.consumerGroup)
+                               }
+                               if runningInfo != nil {
+                                       res.Code = ResSuccess
+                                       data, err := runningInfo.Encode()
+                                       if err != nil {
+                                               res.Remark = fmt.Sprintf("json 
marshal error: %s", err.Error())
+                                       } else {
+                                               res.Body = data
+                                       }
+                               } else {
+                                       res.Remark = "there is unexpected error 
when get running info, please check log"
+                               }
+                       }
                        return res
                })
        }
@@ -659,6 +682,18 @@ func (c *rmqClient) isNeedUpdateSubscribeInfo(topic 
string) bool {
        return result
 }
 
+func (c *rmqClient) getConsumerRunningInfo(group string) *ConsumerRunningInfo {
+       consumer, exist := c.consumerMap.Load(group)
+       if !exist {
+               return nil
+       }
+       info := consumer.(InnerConsumer).GetConsumerRunningInfo()
+       if info != nil {
+               info.Properties[PropClientVersion] = clientVersion
+       }
+       return info
+}
+
 func routeData2SubscribeInfo(topic string, data *TopicRouteData) 
[]*primitive.MessageQueue {
        list := make([]*primitive.MessageQueue, 0)
        for idx := range data.QueueDataList {
@@ -676,16 +711,6 @@ func routeData2SubscribeInfo(topic string, data 
*TopicRouteData) []*primitive.Me
        return list
 }
 
-func encodeMessages(message []*primitive.Message) []byte {
-       var buffer bytes.Buffer
-       index := 0
-       for index < len(message) {
-               buffer.Write(message[index].Body)
-               index++
-       }
-       return buffer.Bytes()
-}
-
 func brokerVIPChannel(brokerAddr string) string {
        if !_VIPChannelEnable {
                return brokerAddr
diff --git a/internal/mock_namesrv.go b/internal/mock_namesrv.go
index 630d4cc..19a2181 100644
--- a/internal/mock_namesrv.go
+++ b/internal/mock_namesrv.go
@@ -52,26 +52,31 @@ func (m *MockNamesrvs) EXPECT() *MockNamesrvsMockRecorder {
 
 // AddBroker mocks base method
 func (m *MockNamesrvs) AddBroker(routeData *TopicRouteData) {
+       m.ctrl.T.Helper()
        m.ctrl.Call(m, "AddBroker", routeData)
 }
 
 // AddBroker indicates an expected call of AddBroker
 func (mr *MockNamesrvsMockRecorder) AddBroker(routeData interface{}) 
*gomock.Call {
+       mr.mock.ctrl.T.Helper()
        return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddBroker", 
reflect.TypeOf((*MockNamesrvs)(nil).AddBroker), routeData)
 }
 
 // cleanOfflineBroker mocks base method
 func (m *MockNamesrvs) cleanOfflineBroker() {
+       m.ctrl.T.Helper()
        m.ctrl.Call(m, "cleanOfflineBroker")
 }
 
 // cleanOfflineBroker indicates an expected call of cleanOfflineBroker
 func (mr *MockNamesrvsMockRecorder) cleanOfflineBroker() *gomock.Call {
+       mr.mock.ctrl.T.Helper()
        return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, 
"cleanOfflineBroker", reflect.TypeOf((*MockNamesrvs)(nil).cleanOfflineBroker))
 }
 
 // UpdateTopicRouteInfo mocks base method
 func (m *MockNamesrvs) UpdateTopicRouteInfo(topic string) *TopicRouteData {
+       m.ctrl.T.Helper()
        ret := m.ctrl.Call(m, "UpdateTopicRouteInfo", topic)
        ret0, _ := ret[0].(*TopicRouteData)
        return ret0
@@ -79,11 +84,13 @@ func (m *MockNamesrvs) UpdateTopicRouteInfo(topic string) 
*TopicRouteData {
 
 // UpdateTopicRouteInfo indicates an expected call of UpdateTopicRouteInfo
 func (mr *MockNamesrvsMockRecorder) UpdateTopicRouteInfo(topic interface{}) 
*gomock.Call {
+       mr.mock.ctrl.T.Helper()
        return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, 
"UpdateTopicRouteInfo", 
reflect.TypeOf((*MockNamesrvs)(nil).UpdateTopicRouteInfo), topic)
 }
 
 // FetchPublishMessageQueues mocks base method
 func (m *MockNamesrvs) FetchPublishMessageQueues(topic string) 
([]*primitive.MessageQueue, error) {
+       m.ctrl.T.Helper()
        ret := m.ctrl.Call(m, "FetchPublishMessageQueues", topic)
        ret0, _ := ret[0].([]*primitive.MessageQueue)
        ret1, _ := ret[1].(error)
@@ -92,11 +99,13 @@ func (m *MockNamesrvs) FetchPublishMessageQueues(topic 
string) ([]*primitive.Mes
 
 // FetchPublishMessageQueues indicates an expected call of 
FetchPublishMessageQueues
 func (mr *MockNamesrvsMockRecorder) FetchPublishMessageQueues(topic 
interface{}) *gomock.Call {
+       mr.mock.ctrl.T.Helper()
        return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, 
"FetchPublishMessageQueues", 
reflect.TypeOf((*MockNamesrvs)(nil).FetchPublishMessageQueues), topic)
 }
 
 // FindBrokerAddrByTopic mocks base method
 func (m *MockNamesrvs) FindBrokerAddrByTopic(topic string) string {
+       m.ctrl.T.Helper()
        ret := m.ctrl.Call(m, "FindBrokerAddrByTopic", topic)
        ret0, _ := ret[0].(string)
        return ret0
@@ -104,11 +113,13 @@ func (m *MockNamesrvs) FindBrokerAddrByTopic(topic 
string) string {
 
 // FindBrokerAddrByTopic indicates an expected call of FindBrokerAddrByTopic
 func (mr *MockNamesrvsMockRecorder) FindBrokerAddrByTopic(topic interface{}) 
*gomock.Call {
+       mr.mock.ctrl.T.Helper()
        return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, 
"FindBrokerAddrByTopic", 
reflect.TypeOf((*MockNamesrvs)(nil).FindBrokerAddrByTopic), topic)
 }
 
 // FindBrokerAddrByName mocks base method
 func (m *MockNamesrvs) FindBrokerAddrByName(brokerName string) string {
+       m.ctrl.T.Helper()
        ret := m.ctrl.Call(m, "FindBrokerAddrByName", brokerName)
        ret0, _ := ret[0].(string)
        return ret0
@@ -116,11 +127,13 @@ func (m *MockNamesrvs) FindBrokerAddrByName(brokerName 
string) string {
 
 // FindBrokerAddrByName indicates an expected call of FindBrokerAddrByName
 func (mr *MockNamesrvsMockRecorder) FindBrokerAddrByName(brokerName 
interface{}) *gomock.Call {
+       mr.mock.ctrl.T.Helper()
        return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, 
"FindBrokerAddrByName", 
reflect.TypeOf((*MockNamesrvs)(nil).FindBrokerAddrByName), brokerName)
 }
 
 // FindBrokerAddressInSubscribe mocks base method
 func (m *MockNamesrvs) FindBrokerAddressInSubscribe(brokerName string, 
brokerId int64, onlyThisBroker bool) *FindBrokerResult {
+       m.ctrl.T.Helper()
        ret := m.ctrl.Call(m, "FindBrokerAddressInSubscribe", brokerName, 
brokerId, onlyThisBroker)
        ret0, _ := ret[0].(*FindBrokerResult)
        return ret0
@@ -128,11 +141,13 @@ func (m *MockNamesrvs) 
FindBrokerAddressInSubscribe(brokerName string, brokerId
 
 // FindBrokerAddressInSubscribe indicates an expected call of 
FindBrokerAddressInSubscribe
 func (mr *MockNamesrvsMockRecorder) FindBrokerAddressInSubscribe(brokerName, 
brokerId, onlyThisBroker interface{}) *gomock.Call {
+       mr.mock.ctrl.T.Helper()
        return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, 
"FindBrokerAddressInSubscribe", 
reflect.TypeOf((*MockNamesrvs)(nil).FindBrokerAddressInSubscribe), brokerName, 
brokerId, onlyThisBroker)
 }
 
 // FetchSubscribeMessageQueues mocks base method
 func (m *MockNamesrvs) FetchSubscribeMessageQueues(topic string) 
([]*primitive.MessageQueue, error) {
+       m.ctrl.T.Helper()
        ret := m.ctrl.Call(m, "FetchSubscribeMessageQueues", topic)
        ret0, _ := ret[0].([]*primitive.MessageQueue)
        ret1, _ := ret[1].(error)
@@ -141,5 +156,20 @@ func (m *MockNamesrvs) FetchSubscribeMessageQueues(topic 
string) ([]*primitive.M
 
 // FetchSubscribeMessageQueues indicates an expected call of 
FetchSubscribeMessageQueues
 func (mr *MockNamesrvsMockRecorder) FetchSubscribeMessageQueues(topic 
interface{}) *gomock.Call {
+       mr.mock.ctrl.T.Helper()
        return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, 
"FetchSubscribeMessageQueues", 
reflect.TypeOf((*MockNamesrvs)(nil).FetchSubscribeMessageQueues), topic)
 }
+
+// AddrList mocks base method
+func (m *MockNamesrvs) AddrList() []string {
+       m.ctrl.T.Helper()
+       ret := m.ctrl.Call(m, "AddrList")
+       ret0, _ := ret[0].([]string)
+       return ret0
+}
+
+// AddrList indicates an expected call of AddrList
+func (mr *MockNamesrvsMockRecorder) AddrList() *gomock.Call {
+       mr.mock.ctrl.T.Helper()
+       return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddrList", 
reflect.TypeOf((*MockNamesrvs)(nil).AddrList))
+}
diff --git a/internal/model.go b/internal/model.go
index 8c95458..ea1fdac 100644
--- a/internal/model.go
+++ b/internal/model.go
@@ -18,9 +18,14 @@ limitations under the License.
 package internal
 
 import (
+       "bytes"
        "encoding/json"
+       "fmt"
+       "sort"
+       "strings"
 
        "github.com/apache/rocketmq-client-go/internal/utils"
+       "github.com/apache/rocketmq-client-go/primitive"
        "github.com/apache/rocketmq-client-go/rlog"
 )
 
@@ -100,3 +105,158 @@ func (data *heartbeatData) encode() []byte {
        rlog.Debug("heartbeat: "+string(d), nil)
        return d
 }
+
+const (
+       PropNameServerAddr         = "PROP_NAMESERVER_ADDR"
+       PropThreadPoolCoreSize     = "PROP_THREADPOOL_CORE_SIZE"
+       PropConsumeOrderly         = "PROP_CONSUMEORDERLY"
+       PropConsumeType            = "PROP_CONSUME_TYPE"
+       PropClientVersion          = "PROP_CLIENT_VERSION"
+       PropConsumerStartTimestamp = "PROP_CONSUMER_START_TIMESTAMP"
+)
+
+type ProcessQueueInfo struct {
+       CommitOffset            int64 `json:"commitOffset"`
+       CachedMsgMinOffset      int64 `json:"cachedMsgMinOffset"`
+       CachedMsgMaxOffset      int64 `json:"cachedMsgMaxOffset"`
+       CachedMsgCount          int   `json:"cachedMsgCount"`
+       CachedMsgSizeInMiB      int64 `json:"cachedMsgSizeInMiB"`
+       TransactionMsgMinOffset int64 `json:"transactionMsgMinOffset"`
+       TransactionMsgMaxOffset int64 `json:"transactionMsgMaxOffset"`
+       TransactionMsgCount     int   `json:"transactionMsgCount"`
+       Locked                  bool  `json:"locked"`
+       TryUnlockTimes          int64 `json:"tryUnlockTimes"`
+       LastLockTimestamp       int64 `json:"lastLockTimestamp"`
+       Dropped                 bool  `json:"dropped"`
+       LastPullTimestamp       int64 `json:"lastPullTimestamp"`
+       LastConsumeTimestamp    int64 `json:"lastConsumeTimestamp"`
+}
+
+type ConsumeStatus struct {
+       PullRT            float64 `json:"pullRT"`
+       PullTPS           float64 `json:"pullTPS"`
+       ConsumeRT         float64 `json:"consumeRT"`
+       ConsumeOKTPS      float64 `json:"consumeOKTPS"`
+       ConsumeFailedTPS  float64 `json:"consumeFailedTPS"`
+       ConsumeFailedMsgs int64   `json:"consumeFailedMsgs"`
+}
+
+type ConsumerRunningInfo struct {
+       Properties       map[string]string
+       SubscriptionData map[*SubscriptionData]bool
+       MQTable          map[primitive.MessageQueue]ProcessQueueInfo
+       StatusTable      map[string]ConsumeStatus
+}
+
+func (info ConsumerRunningInfo) Encode() ([]byte, error) {
+       data, err := json.Marshal(info.Properties)
+       if err != nil {
+               return nil, err
+       }
+       jsonData := fmt.Sprintf("{\"%s\":%s", "properties", string(data))
+
+       data, err = json.Marshal(info.StatusTable)
+       if err != nil {
+               return nil, err
+       }
+       jsonData = fmt.Sprintf("%s,\"%s\":%s", jsonData, "statusTable", 
string(data))
+
+       subs := make([]*SubscriptionData, len(info.SubscriptionData))
+       idx := 0
+       for k := range info.SubscriptionData {
+               subs[idx] = k
+               idx++
+       }
+
+       // make sure test case table
+       sort.Slice(subs, func(i, j int) bool {
+               sub1 := subs[i]
+               sub2 := subs[j]
+               if sub1.ClassFilterMode != sub2.ClassFilterMode {
+                       return sub1.ClassFilterMode == false
+               }
+               com := strings.Compare(sub1.Topic, sub1.Topic)
+               if com != 0 {
+                       return com > 0
+               }
+
+               com = strings.Compare(sub1.SubString, sub1.SubString)
+               if com != 0 {
+                       return com > 0
+               }
+
+               if sub1.SubVersion != sub2.SubVersion {
+                       return sub1.SubVersion > sub2.SubVersion
+               }
+
+               com = strings.Compare(sub1.ExpType, sub1.ExpType)
+               if com != 0 {
+                       return com > 0
+               }
+
+               v1, _ := sub1.Tags.MarshalJSON()
+               v2, _ := sub2.Tags.MarshalJSON()
+               com = bytes.Compare(v1, v2)
+               if com != 0 {
+                       return com > 0
+               }
+
+               v1, _ = sub1.Codes.MarshalJSON()
+               v2, _ = sub2.Codes.MarshalJSON()
+               com = bytes.Compare(v1, v2)
+               if com != 0 {
+                       return com > 0
+               }
+               return true
+       })
+
+       data, err = json.Marshal(subs)
+       if err != nil {
+               return nil, err
+       }
+       jsonData = fmt.Sprintf("%s,\"%s\":%s", jsonData, "subscriptionSet", 
string(data))
+
+       tableJson := ""
+       keys := make([]primitive.MessageQueue, 0)
+
+       for k := range info.MQTable {
+               keys = append(keys, k)
+       }
+
+       sort.Slice(keys, func(i, j int) bool {
+               q1 := keys[i]
+               q2 := keys[j]
+               com := strings.Compare(q1.Topic, q2.Topic)
+               if com != 0 {
+                       return com < 0
+               }
+
+               com = strings.Compare(q1.BrokerName, q2.BrokerName)
+               if com != 0 {
+                       return com < 0
+               }
+
+               return q1.QueueId < q2.QueueId
+       })
+
+       for idx := range keys {
+               dataK, err := json.Marshal(keys[idx])
+               if err != nil {
+                       return nil, err
+               }
+               dataV, err := json.Marshal(info.MQTable[keys[idx]])
+               tableJson = fmt.Sprintf("%s,%s:%s", tableJson, string(dataK), 
string(dataV))
+       }
+       tableJson = strings.TrimLeft(tableJson, ",")
+       jsonData = fmt.Sprintf("%s,\"%s\":%s}", jsonData, "mqTable", 
fmt.Sprintf("{%s}", tableJson))
+       return []byte(jsonData), nil
+}
+
+func NewConsumerRunningInfo() *ConsumerRunningInfo {
+       return &ConsumerRunningInfo{
+               Properties:       make(map[string]string),
+               SubscriptionData: make(map[*SubscriptionData]bool),
+               MQTable:          
make(map[primitive.MessageQueue]ProcessQueueInfo),
+               StatusTable:      make(map[string]ConsumeStatus),
+       }
+}
diff --git a/internal/model_test.go b/internal/model_test.go
index 1dac4ec..56703e0 100644
--- a/internal/model_test.go
+++ b/internal/model_test.go
@@ -20,12 +20,14 @@ package internal
 import (
        "encoding/json"
        "fmt"
+       "strings"
        "testing"
 
        . "github.com/smartystreets/goconvey/convey"
-       "github.com/stretchr/testify/assert"
+       "github.com/tidwall/gjson"
 
        "github.com/apache/rocketmq-client-go/internal/utils"
+       "github.com/apache/rocketmq-client-go/primitive"
 )
 
 func TestHeartbeatData(t *testing.T) {
@@ -43,7 +45,7 @@ func TestHeartbeatData(t *testing.T) {
                        set.Add(pData2)
 
                        v, err := json.Marshal(set)
-                       assert.Nil(t, err)
+                       So(err, ShouldBeNil)
                        fmt.Printf("json producer set: %s", string(v))
                })
 
@@ -61,8 +63,7 @@ func TestHeartbeatData(t *testing.T) {
                        hbt.ProducerDatas.Add(p2)
 
                        v, err := json.Marshal(hbt)
-                       //ShouldBeNil(t, err)
-                       assert.Nil(t, err)
+                       So(err, ShouldBeNil)
                        fmt.Printf("json producer: %s\n", string(v))
                })
 
@@ -79,8 +80,7 @@ func TestHeartbeatData(t *testing.T) {
                        hbt.ConsumerDatas.Add(c2)
 
                        v, err := json.Marshal(hbt)
-                       //ShouldBeNil(t, err)
-                       assert.Nil(t, err)
+                       So(err, ShouldBeNil)
                        fmt.Printf("json consumer: %s\n", string(v))
                })
 
@@ -108,10 +108,257 @@ func TestHeartbeatData(t *testing.T) {
                        hbt.ConsumerDatas.Add(c2)
 
                        v, err := json.Marshal(hbt)
-                       //ShouldBeNil(t, err)
-                       assert.Nil(t, err)
+                       So(err, ShouldBeNil)
                        fmt.Printf("json producer & consumer: %s\n", string(v))
                })
        })
 
 }
+
+func TestConsumerRunningInfo_MarshalJSON(t *testing.T) {
+       Convey("test ConsumerRunningInfo MarshalJson", t, func() {
+               props := map[string]string{
+                       "maxReconsumeTimes":             "-1",
+                       "unitMode":                      "false",
+                       "adjustThreadPoolNumsThreshold": "100000",
+                       "consumerGroup":                 
"mq-client-go-test%GID_GO_TEST",
+                       "messageModel":                  "CLUSTERING",
+                       "suspendCurrentQueueTimeMillis": "1000",
+                       "pullThresholdSizeForTopic":     "-1",
+                       "pullThresholdSizeForQueue":     "100",
+                       "PROP_CLIENT_VERSION":           "V4_5_1",
+                       "consumeConcurrentlyMaxSpan":    "2000",
+                       "postSubscriptionWhenPull":      "false",
+                       "consumeTimestamp":              "20191127013617",
+                       "PROP_CONSUME_TYPE":             "CONSUME_PASSIVELY",
+                       "consumeTimeout":                "15",
+                       "consumeMessageBatchMaxSize":    "1",
+                       "PROP_THREADPOOL_CORE_SIZE":     "20",
+                       "pullInterval":                  "0",
+                       "pullThresholdForQueue":         "1000",
+                       "pullThresholdForTopic":         "-1",
+                       "consumeFromWhere":              
"CONSUME_FROM_FIRST_OFFSET",
+                       "PROP_NAMESERVER_ADDR":          
"mq-client-go-test.mq-internet-access.mq-internet.aliyuncs.com:80;",
+                       "pullBatchSize":                 "32",
+                       "consumeThreadMin":              "20",
+                       "PROP_CONSUMER_START_TIMESTAMP": "1574791577504",
+                       "consumeThreadMax":              "20",
+                       "subscription":                  "{}",
+                       "PROP_CONSUMEORDERLY":           "false",
+               }
+               subData := map[*SubscriptionData]bool{
+                       &SubscriptionData{
+                               ClassFilterMode: false,
+                               Codes:           utils.NewSet(),
+                               ExpType:         "TAG",
+                               SubString:       "*",
+                               SubVersion:      1574791579242,
+                               Tags:            utils.NewSet(),
+                               Topic:           
"%RETRY%mq-client-go-test%GID_GO_TEST",
+                       }: true,
+                       &SubscriptionData{
+                               ClassFilterMode: true,
+                               Codes:           utils.NewSet(),
+                               ExpType:         "TAG",
+                               SubString:       "*",
+                               SubVersion:      1574791577523,
+                               Tags:            utils.NewSet(),
+                               Topic:           "mq-client-go-test%go-test",
+                       }: true,
+               }
+               statusTable := map[string]ConsumeStatus{
+                       "%RETRY%mq-client-go-test%GID_GO_TEST": {
+                               PullRT:            11.11,
+                               PullTPS:           22.22,
+                               ConsumeRT:         33.33,
+                               ConsumeOKTPS:      44.44,
+                               ConsumeFailedTPS:  55.55,
+                               ConsumeFailedMsgs: 666,
+                       },
+                       "mq-client-go-test%go-test": {
+                               PullRT:            123,
+                               PullTPS:           123,
+                               ConsumeRT:         123,
+                               ConsumeOKTPS:      123,
+                               ConsumeFailedTPS:  123,
+                               ConsumeFailedMsgs: 1234,
+                       },
+               }
+               mqTable := map[primitive.MessageQueue]ProcessQueueInfo{
+                       {
+                               Topic:      
"%RETRY%mq-client-go-test%GID_GO_TEST",
+                               BrokerName: "qd7internet-01",
+                               QueueId:    0,
+                       }: {
+                               CommitOffset:            0,
+                               CachedMsgMinOffset:      0,
+                               CachedMsgMaxOffset:      0,
+                               CachedMsgCount:          0,
+                               CachedMsgSizeInMiB:      0,
+                               TransactionMsgMinOffset: 0,
+                               TransactionMsgMaxOffset: 0,
+                               TransactionMsgCount:     0,
+                               Locked:                  false,
+                               TryUnlockTimes:          0,
+                               LastLockTimestamp:       1574791579221,
+                               Dropped:                 false,
+                               LastPullTimestamp:       1574791579242,
+                               LastConsumeTimestamp:    1574791579221,
+                       },
+                       {
+                               Topic:      
"%RETRY%mq-client-go-test%GID_GO_TEST",
+                               BrokerName: "qd7internet-01",
+                               QueueId:    1,
+                       }: {
+                               CommitOffset:            1,
+                               CachedMsgMinOffset:      2,
+                               CachedMsgMaxOffset:      3,
+                               CachedMsgCount:          4,
+                               CachedMsgSizeInMiB:      5,
+                               TransactionMsgMinOffset: 6,
+                               TransactionMsgMaxOffset: 7,
+                               TransactionMsgCount:     8,
+                               Locked:                  true,
+                               TryUnlockTimes:          9,
+                               LastLockTimestamp:       1574791579221,
+                               Dropped:                 false,
+                               LastPullTimestamp:       1574791579242,
+                               LastConsumeTimestamp:    1574791579221,
+                       },
+               }
+               info := ConsumerRunningInfo{
+                       Properties:       props,
+                       SubscriptionData: subData,
+                       StatusTable:      statusTable,
+                       MQTable:          mqTable,
+               }
+               data, err := info.Encode()
+               So(err, ShouldBeNil)
+               result := gjson.ParseBytes(data)
+               Convey("test Properties fields", func() {
+                       r1 := result.Get("properties")
+                       So(r1.Exists(), ShouldBeTrue)
+                       m := r1.Map()
+                       So(len(m), ShouldEqual, 27)
+
+                       So(m["PROP_CLIENT_VERSION"], ShouldNotBeEmpty)
+                       So(m["PROP_CLIENT_VERSION"].String(), ShouldEqual, 
"V4_5_1")
+
+                       So(m["PROP_CONSUME_TYPE"], ShouldNotBeNil)
+                       So(m["PROP_CONSUME_TYPE"].String(), ShouldEqual, 
"CONSUME_PASSIVELY")
+
+                       So(m["PROP_THREADPOOL_CORE_SIZE"], ShouldNotBeNil)
+                       So(m["PROP_THREADPOOL_CORE_SIZE"].String(), 
ShouldEqual, "20")
+
+                       So(m["PROP_NAMESERVER_ADDR"], ShouldNotBeNil)
+                       So(m["PROP_NAMESERVER_ADDR"].String(), ShouldEqual, 
"mq-client-go-test.mq-internet-access.mq-internet.aliyuncs.com:80;")
+
+                       So(m["PROP_CONSUMER_START_TIMESTAMP"], ShouldNotBeNil)
+                       So(m["PROP_CONSUMER_START_TIMESTAMP"].String(), 
ShouldEqual, "1574791577504")
+
+                       So(m["PROP_CONSUMEORDERLY"], ShouldNotBeNil)
+                       So(m["PROP_CONSUMEORDERLY"].String(), ShouldEqual, 
"false")
+               })
+               Convey("test SubscriptionData fields", func() {
+                       r2 := result.Get("subscriptionSet")
+                       So(r2.Exists(), ShouldBeTrue)
+                       arr := r2.Array()
+                       So(len(arr), ShouldEqual, 2)
+
+                       m1 := arr[0].Map()
+                       So(len(m1), ShouldEqual, 7)
+                       So(m1["classFilterMode"].Bool(), ShouldEqual, false)
+                       So(len(m1["codes"].Array()), ShouldEqual, 0)
+                       So(m1["expressionType"].String(), ShouldEqual, "TAG")
+                       So(m1["subString"].String(), ShouldEqual, "*")
+                       So(m1["subVersion"].Int(), ShouldEqual, 1574791579242)
+                       So(len(m1["tags"].Array()), ShouldEqual, 0)
+                       So(m1["topic"].String(), ShouldEqual, 
"%RETRY%mq-client-go-test%GID_GO_TEST")
+
+                       m2 := arr[1].Map()
+                       So(len(m2), ShouldEqual, 7)
+                       So(m2["classFilterMode"].Bool(), ShouldEqual, true)
+                       So(len(m2["codes"].Array()), ShouldEqual, 0)
+                       So(m2["expressionType"].String(), ShouldEqual, "TAG")
+                       So(m2["subString"].String(), ShouldEqual, "*")
+                       So(m2["subVersion"].Int(), ShouldEqual, 1574791577523)
+                       So(len(m2["tags"].Array()), ShouldEqual, 0)
+                       So(m2["topic"].String(), ShouldEqual, 
"mq-client-go-test%go-test")
+               })
+               Convey("test StatusTable fields", func() {
+                       r3 := result.Get("statusTable")
+                       So(r3.Exists(), ShouldBeTrue)
+                       m := r3.Map()
+                       So(len(m), ShouldEqual, 2)
+
+                       status1 := m["mq-client-go-test%go-test"].Map()
+                       So(len(status1), ShouldEqual, 6)
+                       So(status1["pullRT"].Float(), ShouldEqual, 123)
+                       So(status1["pullTPS"].Float(), ShouldEqual, 123)
+                       So(status1["consumeRT"].Float(), ShouldEqual, 123)
+                       So(status1["consumeOKTPS"].Float(), ShouldEqual, 123)
+                       So(status1["consumeFailedTPS"].Float(), ShouldEqual, 
123)
+                       So(status1["consumeFailedMsgs"].Int(), ShouldEqual, 
1234)
+
+                       status2 := 
m["%RETRY%mq-client-go-test%GID_GO_TEST"].Map()
+                       So(len(status2), ShouldEqual, 6)
+                       So(status2["pullRT"].Float(), ShouldEqual, 11.11)
+                       So(status2["pullTPS"].Float(), ShouldEqual, 22.22)
+                       So(status2["consumeRT"].Float(), ShouldEqual, 33.33)
+                       So(status2["consumeOKTPS"].Float(), ShouldEqual, 44.44)
+                       So(status2["consumeFailedTPS"].Float(), ShouldEqual, 
55.55)
+                       So(status2["consumeFailedMsgs"].Int(), ShouldEqual, 666)
+               })
+               Convey("test MQTable fields", func() {
+                       r4 := result.Get("mqTable")
+                       So(r4.Exists(), ShouldBeTrue)
+                       objNumbers := strings.Split(r4.String(), "},{")
+                       So(len(objNumbers), ShouldEqual, 2)
+
+                       obj1Str := objNumbers[0][1:len(objNumbers[0])] + "}"
+                       obj1KV := strings.Split(obj1Str, "}:{")
+                       So(len(obj1KV), ShouldEqual, 2)
+
+                       obj1 := gjson.Parse("{" + obj1KV[1][0:len(obj1KV[1])])
+                       So(obj1.Exists(), ShouldBeTrue)
+                       obj1M := obj1.Map()
+                       So(len(obj1M), ShouldEqual, 14)
+                       So(obj1M["commitOffset"].Int(), ShouldEqual, 0)
+                       So(obj1M["cachedMsgMinOffset"].Int(), ShouldEqual, 0)
+                       So(obj1M["cachedMsgMaxOffset"].Int(), ShouldEqual, 0)
+                       So(obj1M["cachedMsgCount"].Int(), ShouldEqual, 0)
+                       So(obj1M["cachedMsgSizeInMiB"].Int(), ShouldEqual, 0)
+                       So(obj1M["transactionMsgMinOffset"].Int(), ShouldEqual, 
0)
+                       So(obj1M["transactionMsgMaxOffset"].Int(), ShouldEqual, 
0)
+                       So(obj1M["transactionMsgCount"].Int(), ShouldEqual, 0)
+                       So(obj1M["locked"].Bool(), ShouldEqual, false)
+                       So(obj1M["tryUnlockTimes"].Int(), ShouldEqual, 0)
+                       So(obj1M["lastLockTimestamp"].Int(), ShouldEqual, 
1574791579221)
+                       So(obj1M["dropped"].Bool(), ShouldEqual, false)
+                       So(obj1M["lastPullTimestamp"].Int(), ShouldEqual, 
1574791579242)
+                       So(obj1M["lastConsumeTimestamp"].Int(), ShouldEqual, 
1574791579221)
+
+                       obj2Str := "{" + objNumbers[1][0:len(objNumbers[1])-1]
+                       obj2KV := strings.Split(obj2Str, "}:{")
+                       So(len(obj2KV), ShouldEqual, 2)
+                       obj2 := gjson.Parse("{" + obj2KV[1][0:len(obj2KV[1])])
+                       So(obj2.Exists(), ShouldBeTrue)
+                       obj2M := obj2.Map()
+                       So(len(obj2M), ShouldEqual, 14)
+                       So(obj2M["commitOffset"].Int(), ShouldEqual, 1)
+                       So(obj2M["cachedMsgMinOffset"].Int(), ShouldEqual, 2)
+                       So(obj2M["cachedMsgMaxOffset"].Int(), ShouldEqual, 3)
+                       So(obj2M["cachedMsgCount"].Int(), ShouldEqual, 4)
+                       So(obj2M["cachedMsgSizeInMiB"].Int(), ShouldEqual, 5)
+                       So(obj2M["transactionMsgMinOffset"].Int(), ShouldEqual, 
6)
+                       So(obj2M["transactionMsgMaxOffset"].Int(), ShouldEqual, 
7)
+                       So(obj2M["transactionMsgCount"].Int(), ShouldEqual, 8)
+                       So(obj2M["locked"].Bool(), ShouldEqual, true)
+                       So(obj2M["tryUnlockTimes"].Int(), ShouldEqual, 9)
+                       So(obj2M["lastLockTimestamp"].Int(), ShouldEqual, 
1574791579221)
+                       So(obj2M["dropped"].Bool(), ShouldEqual, false)
+                       So(obj2M["lastPullTimestamp"].Int(), ShouldEqual, 
1574791579242)
+                       So(obj2M["lastConsumeTimestamp"].Int(), ShouldEqual, 
1574791579221)
+               })
+       })
+}
diff --git a/internal/namesrv.go b/internal/namesrv.go
index 3c20968..b9f1744 100644
--- a/internal/namesrv.go
+++ b/internal/namesrv.go
@@ -52,6 +52,8 @@ type Namesrvs interface {
        FindBrokerAddressInSubscribe(brokerName string, brokerId int64, 
onlyThisBroker bool) *FindBrokerResult
 
        FetchSubscribeMessageQueues(topic string) ([]*primitive.MessageQueue, 
error)
+
+       AddrList() []string
 }
 
 // namesrvs rocketmq namesrv instance.
@@ -119,3 +121,7 @@ func (s *namesrvs) String() string {
 func (s *namesrvs) SetCredentials(credentials primitive.Credentials) {
        s.nameSrvClient.RegisterInterceptor(remote.ACLInterceptor(credentials))
 }
+
+func (s *namesrvs) AddrList() []string {
+       return s.srvs
+}
diff --git a/internal/request.go b/internal/request.go
index 76d0d7d..470fd35 100644
--- a/internal/request.go
+++ b/internal/request.go
@@ -78,10 +78,6 @@ func (request *SendMessageRequestHeader) Encode() 
map[string]string {
        return maps
 }
 
-func (request *SendMessageRequestHeader) Decode(properties map[string]string) 
error {
-       return nil
-}
-
 type EndTransactionRequestHeader struct {
        ProducerGroup        string
        TranStateTableOffset int64
@@ -123,23 +119,23 @@ func (request *CheckTransactionStateRequestHeader) 
Encode() map[string]string {
        return maps
 }
 
-func (request *CheckTransactionStateRequestHeader) Decode(ext 
map[string]string) {
-       if len(ext) == 0 {
+func (request *CheckTransactionStateRequestHeader) Decode(properties 
map[string]string) {
+       if len(properties) == 0 {
                return
        }
-       if v, existed := ext["tranStateTableOffset"]; existed {
+       if v, existed := properties["tranStateTableOffset"]; existed {
                request.TranStateTableOffset, _ = strconv.ParseInt(v, 10, 0)
        }
-       if v, existed := ext["commitLogOffset"]; existed {
+       if v, existed := properties["commitLogOffset"]; existed {
                request.CommitLogOffset, _ = strconv.ParseInt(v, 10, 0)
        }
-       if v, existed := ext["msgId"]; existed {
+       if v, existed := properties["msgId"]; existed {
                request.MsgId = v
        }
-       if v, existed := ext["transactionId"]; existed {
+       if v, existed := properties["transactionId"]; existed {
                request.MsgId = v
        }
-       if v, existed := ext["offsetMsgId"]; existed {
+       if v, existed := properties["offsetMsgId"]; existed {
                request.MsgId = v
        }
 }
@@ -273,6 +269,27 @@ func (request *GetRouteInfoRequestHeader) Encode() 
map[string]string {
        return maps
 }
 
-func (request *GetRouteInfoRequestHeader) Decode(properties map[string]string) 
error {
-       return nil
+type GetConsumerRunningInfoHeader struct {
+       consumerGroup string
+       clientID      string
+}
+
+func (request *GetConsumerRunningInfoHeader) Encode() map[string]string {
+       maps := make(map[string]string)
+       maps["consumerGroup"] = request.consumerGroup
+       maps["clientId"] = request.clientID
+       return maps
+}
+
+func (request *GetConsumerRunningInfoHeader) Decode(properties 
map[string]string) {
+       if len(properties) == 0 {
+               return
+       }
+       if v, existed := properties["consumerGroup"]; existed {
+               request.consumerGroup = v
+       }
+
+       if v, existed := properties["clientId"]; existed {
+               request.clientID = v
+       }
 }

Reply via email to