Repository: incubator-rocketmq-externals
Updated Branches:
  refs/heads/master 7ceba1b23 -> aaa0758e6


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/service/mq_client.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/service/mq_client.go b/rocketmq-go/service/mq_client.go
index 0cf7df2..8bbfe79 100644
--- a/rocketmq-go/service/mq_client.go
+++ b/rocketmq-go/service/mq_client.go
@@ -16,5 +16,348 @@
  */
 package service
 
+import (
+       "encoding/json"
+       "errors"
+       "fmt"
+       "github.com/apache/incubator-rocketmq-externals/rocketmq-go/model"
+       
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/config"
+       
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/constant"
+       
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/header"
+       "github.com/apache/incubator-rocketmq-externals/rocketmq-go/remoting"
+       "github.com/apache/incubator-rocketmq-externals/rocketmq-go/util"
+       "github.com/golang/glog"
+       "os"
+       "strconv"
+       "strings"
+       "time"
+)
+
+//this struct is for something common,for example
+//1.brokerInfo
+//2.routerInfo
+//3.subscribeInfo
+//4.heartbeat
 type RocketMqClient interface {
+       GetClientId() (clientId string)
+       GetRemotingClient() (remotingClient *remoting.DefalutRemotingClient)
+       GetTopicSubscribeInfo(topic string) (messageQueueList 
[]*model.MessageQueue)
+       GetPublishTopicList() []string
+       FetchMasterBrokerAddress(brokerName string) (masterAddress string)
+       EnqueuePullMessageRequest(pullRequest *model.PullRequest)
+       DequeuePullMessageRequest() (pullRequest *model.PullRequest)
+       FindBrokerAddressInSubscribe(brokerName string, brokerId int, 
onlyThisBroker bool) (brokerAddr string, slave bool, found bool)
+       TryToFindTopicPublishInfo(topic string) (topicPublicInfo 
*model.TopicPublishInfo, err error)
+       FindBrokerAddrByTopic(topic string) (addr string, ok bool)
+       UpdateTopicRouteInfoFromNameServer(topic string) (err error)
+       UpdateTopicRouteInfoFromNameServerUseDefaultTopic(topic string) (err 
error)
+       SendHeartbeatToAllBroker(heartBeatData *model.HeartbeatData) (err error)
+       ClearExpireResponse()
+       GetMaxOffset(mq *model.MessageQueue) int64
+       SearchOffset(mq *model.MessageQueue, time time.Time) int64
+}
+
+var DEFAULT_TIMEOUT int64 = 6000
+
+// common
+type MqClientImpl struct {
+       ClientId                string
+       remotingClient          remoting.RemotingClient
+       TopicRouteTable         util.ConcurrentMap      // 
map[string]*model.TopicRouteData   //topic | topicRoteData
+       BrokerAddrTable         util.ConcurrentMap      
//map[string]map[int]string          //brokerName | map[brokerId]address
+       TopicPublishInfoTable   util.ConcurrentMap      
//map[string]*model.TopicPublishInfo //topic | TopicPublishInfo //all use this
+       TopicSubscribeInfoTable util.ConcurrentMap      
//map[string][]*model.MessageQueue   //topic | MessageQueue
+       PullRequestQueue        chan *model.PullRequest //todo move
+}
+
+func MqClientInit(clientConfig *config.ClientConfig, clientRequestProcessor 
remoting.ClientRequestProcessor) (mqClientImpl *MqClientImpl) {
+       mqClientImpl = &MqClientImpl{}
+       mqClientImpl.ClientId = buildMqClientImplId()
+       mqClientImpl.TopicRouteTable = util.New() // 
make(map[string]*model.TopicRouteData)
+       mqClientImpl.BrokerAddrTable = util.New() 
//make(map[string]map[int]string)
+       mqClientImpl.remotingClient = remoting.RemotingClientInit(clientConfig, 
clientRequestProcessor)
+       mqClientImpl.TopicPublishInfoTable = util.New()   
//make(map[string]*model.TopicPublishInfo)
+       mqClientImpl.TopicSubscribeInfoTable = util.New() 
//make(map[string][]*model.MessageQueue)
+       mqClientImpl.PullRequestQueue = make(chan *model.PullRequest, 1024)
+       return
+}
+func (self *MqClientImpl) GetTopicSubscribeInfo(topic string) 
(messageQueueList []*model.MessageQueue) {
+       value, ok := self.TopicSubscribeInfoTable.Get(topic)
+       if ok {
+               messageQueueList = value.([]*model.MessageQueue)
+       }
+       return
+}
+func (self *MqClientImpl) GetMaxOffset(mq *model.MessageQueue) int64 {
+       brokerAddr := self.FetchMasterBrokerAddress(mq.BrokerName)
+       if len(brokerAddr) == 0 {
+               self.TryToFindTopicPublishInfo(mq.Topic)
+               brokerAddr = self.FetchMasterBrokerAddress(mq.BrokerName)
+       }
+       getMaxOffsetRequestHeader := &header.GetMaxOffsetRequestHeader{Topic: 
mq.Topic, QueueId: mq.QueueId}
+       remotingCmd := remoting.NewRemotingCommand(remoting.GET_MAX_OFFSET, 
getMaxOffsetRequestHeader)
+       response, err := self.remotingClient.InvokeSync(brokerAddr, 
remotingCmd, DEFAULT_TIMEOUT)
+       if err != nil {
+               return -1
+       }
+       queryOffsetResponseHeader := header.QueryOffsetResponseHeader{}
+       queryOffsetResponseHeader.FromMap(response.ExtFields)
+       glog.Info("op=look max offset result", string(response.Body))
+       return queryOffsetResponseHeader.Offset
+}
+func (self *MqClientImpl) SearchOffset(mq *model.MessageQueue, time time.Time) 
int64 {
+       brokerAddr := self.FetchMasterBrokerAddress(mq.BrokerName)
+       if len(brokerAddr) == 0 {
+               self.TryToFindTopicPublishInfo(mq.Topic)
+               brokerAddr = self.FetchMasterBrokerAddress(mq.BrokerName)
+       }
+       timeStamp := time.UnixNano() / 1000000
+       searchOffsetRequestHeader := &header.SearchOffsetRequestHeader{Topic: 
mq.Topic, QueueId: mq.QueueId, Timestamp: timeStamp}
+       remotingCmd := 
remoting.NewRemotingCommand(remoting.SEARCH_OFFSET_BY_TIMESTAMP, 
searchOffsetRequestHeader)
+       response, err := self.remotingClient.InvokeSync(brokerAddr, 
remotingCmd, DEFAULT_TIMEOUT)
+       if err != nil {
+               return -1
+       }
+       queryOffsetResponseHeader := header.QueryOffsetResponseHeader{}
+
+       queryOffsetResponseHeader.FromMap(response.ExtFields)
+       glog.Info("op=look search offset result", string(response.Body))
+       return queryOffsetResponseHeader.Offset
+}
+func (self *MqClientImpl) GetClientId() string {
+       return self.ClientId
+}
+func (self *MqClientImpl) GetPublishTopicList() []string {
+       var publishTopicList []string
+       for _, topic := range self.TopicPublishInfoTable.Keys() {
+               publishTopicList = append(publishTopicList, topic)
+       }
+       return publishTopicList
+}
+func (self *MqClientImpl) GetRemotingClient() remoting.RemotingClient {
+       return self.remotingClient
+}
+
+func (self *MqClientImpl) EnqueuePullMessageRequest(pullRequest 
*model.PullRequest) {
+       self.PullRequestQueue <- pullRequest
+}
+func (self *MqClientImpl) DequeuePullMessageRequest() (pullRequest 
*model.PullRequest) {
+       pullRequest = <-self.PullRequestQueue
+       return
+}
+
+func (self *MqClientImpl) ClearExpireResponse() {
+       //self.remotingClient.ClearExpireResponse()
+}
+
+func (self *MqClientImpl) FetchMasterBrokerAddress(brokerName string) 
(masterAddress string) {
+       value, ok := self.BrokerAddrTable.Get(brokerName)
+       if ok {
+               masterAddress = value.(map[string]string)["0"]
+       }
+       return
+}
+func (self *MqClientImpl) TryToFindTopicPublishInfo(topic string) 
(topicPublicInfo *model.TopicPublishInfo, err error) {
+       value, ok := self.TopicPublishInfoTable.Get(topic)
+       if ok {
+               topicPublicInfo = value.(*model.TopicPublishInfo)
+       }
+
+       if topicPublicInfo == nil || !topicPublicInfo.JudgeTopicPublishInfoOk() 
{
+               self.TopicPublishInfoTable.Set(topic, 
&model.TopicPublishInfo{HaveTopicRouterInfo: false})
+               err = self.UpdateTopicRouteInfoFromNameServer(topic)
+               if err != nil {
+                       glog.Warning(err) // if updateRouteInfo error, maybe we 
can use the defaultTopic
+               }
+               value, ok := self.TopicPublishInfoTable.Get(topic)
+               if ok {
+                       topicPublicInfo = value.(*model.TopicPublishInfo)
+               }
+       }
+       if topicPublicInfo.HaveTopicRouterInfo && 
topicPublicInfo.JudgeTopicPublishInfoOk() {
+               return
+       }
+       //try to use the defaultTopic
+       err = self.UpdateTopicRouteInfoFromNameServerUseDefaultTopic(topic)
+
+       defaultValue, defaultValueOk := self.TopicPublishInfoTable.Get(topic)
+       if defaultValueOk {
+               topicPublicInfo = defaultValue.(*model.TopicPublishInfo)
+       }
+
+       return
+}
+
+func (self MqClientImpl) GetTopicRouteInfoFromNameServer(topic string, 
timeoutMillis int64) (*model.TopicRouteData, error) {
+       requestHeader := &header.GetRouteInfoRequestHeader{
+               Topic: topic,
+       }
+       var remotingCommand = 
remoting.NewRemotingCommand(remoting.GET_ROUTEINTO_BY_TOPIC, requestHeader)
+       response, err := self.remotingClient.InvokeSync("", remotingCommand, 
timeoutMillis)
+
+       if err != nil {
+               return nil, err
+       }
+       if response.Code == remoting.SUCCESS {
+               //todo  it's dirty
+               topicRouteData := new(model.TopicRouteData)
+               bodyjson := strings.Replace(string(response.Body), ",0:", 
",\"0\":", -1)
+               bodyjson = strings.Replace(bodyjson, ",1:", ",\"1\":", -1) // 
fastJson的key没有引号 需要通用的方法
+               bodyjson = strings.Replace(bodyjson, "{0:", "{\"0\":", -1)
+               bodyjson = strings.Replace(bodyjson, "{1:", "{\"1\":", -1)
+               err = json.Unmarshal([]byte(bodyjson), topicRouteData)
+               if err != nil {
+                       glog.Error(err, bodyjson)
+                       return nil, err
+               }
+               return topicRouteData, nil
+       } else {
+               return nil, errors.New(fmt.Sprintf("get topicRouteInfo from 
nameServer error[code:%d,topic:%s]", response.Code, topic))
+       }
+}
+
+func (self MqClientImpl) FindBrokerAddressInSubscribe(brokerName string, 
brokerId int, onlyThisBroker bool) (brokerAddr string, slave bool, found bool) {
+       slave = false
+       found = false
+       value, ok := self.BrokerAddrTable.Get(brokerName)
+       if !ok {
+               return
+       }
+       brokerMap := value.(map[string]string)
+       //self.brokerAddrTableLock.RUnlock()
+       brokerAddr, ok = brokerMap[util.IntToString(brokerId)]
+       slave = (brokerId != 0)
+       found = ok
+
+       if !found && !onlyThisBroker {
+               var id string
+               for id, brokerAddr = range brokerMap {
+                       slave = (id != "0")
+                       found = true
+                       break
+               }
+       }
+       return
+}
+
+func (self MqClientImpl) UpdateTopicRouteInfoFromNameServer(topic string) (err 
error) {
+       var (
+               topicRouteData *model.TopicRouteData
+       )
+       //namesvr lock
+       //topicRouteData = 
this.MqClientImplAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
+       topicRouteData, err = self.GetTopicRouteInfoFromNameServer(topic, 
1000*3)
+       if err != nil {
+               return
+       }
+       self.updateTopicRouteInfoLocal(topic, topicRouteData)
+       return
+}
+func (self MqClientImpl) 
UpdateTopicRouteInfoFromNameServerUseDefaultTopic(topic string) (err error) {
+       var (
+               topicRouteData *model.TopicRouteData
+       )
+       //namesvr lock
+       //topicRouteData = 
this.MqClientImplAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
+       topicRouteData, err = 
self.GetTopicRouteInfoFromNameServer(constant.DEFAULT_TOPIC, 1000*3)
+       if err != nil {
+               return
+       }
+
+       for _, queueData := range topicRouteData.QueueDatas {
+               defaultQueueData := constant.DEFAULT_TOPIC_QUEUE_NUMS
+               if queueData.ReadQueueNums < defaultQueueData {
+                       defaultQueueData = queueData.ReadQueueNums
+               }
+               queueData.ReadQueueNums = defaultQueueData
+               queueData.WriteQueueNums = defaultQueueData
+       }
+       self.updateTopicRouteInfoLocal(topic, topicRouteData)
+       return
+}
+func (self MqClientImpl) updateTopicRouteInfoLocal(topic string, 
topicRouteData *model.TopicRouteData) (err error) {
+       if topicRouteData == nil {
+               return
+       }
+       // topicRouteData judgeTopicRouteData need update
+       needUpdate := true
+       if !needUpdate {
+               return
+       }
+       //update brokerAddrTable
+       for _, brokerData := range topicRouteData.BrokerDatas {
+               self.BrokerAddrTable.Set(brokerData.BrokerName, 
brokerData.BrokerAddrs)
+       }
+
+       //update pubInfo for each
+       topicPublishInfo := model.BuildTopicPublishInfoFromTopicRoteData(topic, 
topicRouteData)
+       self.TopicPublishInfoTable.Set(topic, topicPublishInfo)
+
+       mqList := model.BuildTopicSubscribeInfoFromRoteData(topic, 
topicRouteData)
+       self.TopicSubscribeInfoTable.Set(topic, mqList)
+       self.TopicRouteTable.Set(topic, topicRouteData)
+       return
+}
+
+func (self MqClientImpl) FindBrokerAddrByTopic(topic string) (addr string, ok 
bool) {
+       value, findValue := self.TopicRouteTable.Get(topic)
+       if !findValue {
+               return "", false
+       }
+       topicRouteData := value.(*model.TopicRouteData)
+       brokers := topicRouteData.BrokerDatas
+       if brokers != nil && len(brokers) > 0 {
+               brokerData := brokers[0]
+               brokerData.BrokerAddrsLock.RLock()
+               addr, ok = brokerData.BrokerAddrs["0"]
+               brokerData.BrokerAddrsLock.RUnlock()
+
+               if ok {
+                       return
+               }
+               for _, addr = range brokerData.BrokerAddrs {
+                       return addr, ok
+               }
+       }
+       return
+}
+
+func buildMqClientImplId() (clientId string) {
+       clientId = util.GetLocalIp4() + "@" + strconv.Itoa(os.Getpid())
+       return
+}
+
+func (self MqClientImpl) sendHeartBeat(addr string, remotingCommand 
*remoting.RemotingCommand, timeoutMillis int64) error {
+       remotingCommand, err := self.remotingClient.InvokeSync(addr, 
remotingCommand, timeoutMillis)
+       if err != nil {
+               glog.Error(err)
+       } else {
+               if remotingCommand == nil || remotingCommand.Code != 
remoting.SUCCESS {
+                       glog.Error("send heartbeat response  error")
+               }
+       }
+       return err
+}
+
+func (self MqClientImpl) SendHeartbeatToAllBroker(heartBeatData 
*model.HeartbeatData) (err error) {
+       //self.brokerAddrTableLock.RLock()
+
+       for _, brokerTable := range self.BrokerAddrTable.Items() {
+               for brokerId, addr := range brokerTable.(map[string]string) {
+                       if len(addr) == 0 || brokerId != "0" {
+                               continue
+                       }
+                       data, err := json.Marshal(heartBeatData)
+                       if err != nil {
+                               glog.Error(err)
+                               return err
+                       }
+                       glog.V(2).Info("send heartbeat to broker look data[", 
string(data)+"]")
+                       remotingCommand := 
remoting.NewRemotingCommandWithBody(remoting.HEART_BEAT, nil, data)
+                       glog.V(2).Info("send heartbeat to broker[", addr+"]")
+                       self.sendHeartBeat(addr, remotingCommand, 3000)
+
+               }
+       }
+       return nil
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/util/concurrent_map.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/util/concurrent_map.go 
b/rocketmq-go/util/concurrent_map.go
new file mode 100644
index 0000000..2fbe9bf
--- /dev/null
+++ b/rocketmq-go/util/concurrent_map.go
@@ -0,0 +1,278 @@
+package util
+
+import (
+       "encoding/json"
+       "sync"
+)
+
+var SHARD_COUNT = 33
+
+// A "thread" safe map of type string:Anything.
+// To avoid lock bottlenecks this map is dived to several (SHARD_COUNT) map 
shards.
+type ConcurrentMap []*concurrentMapShared
+
+// A "thread" safe string to anything map.
+type concurrentMapShared struct {
+       items        map[string]interface{}
+       sync.RWMutex // Read Write mutex, guards access to internal map.
+}
+
+// Creates a new concurrent map.
+func New() ConcurrentMap {
+       m := make(ConcurrentMap, SHARD_COUNT)
+       for i := 0; i < SHARD_COUNT; i++ {
+               m[i] = &concurrentMapShared{items: make(map[string]interface{})}
+       }
+       return m
+}
+
+// Returns shard under given key
+func (m ConcurrentMap) GetShard(key string) *concurrentMapShared {
+       return m[uint(fnv32(key))%uint(SHARD_COUNT)]
+}
+
+func (m ConcurrentMap) MSet(data map[string]interface{}) {
+       for key, value := range data {
+               shard := m.GetShard(key)
+               shard.Lock()
+               shard.items[key] = value
+               shard.Unlock()
+       }
+}
+
+// Sets the given value under the specified key.
+func (m *ConcurrentMap) Set(key string, value interface{}) {
+       // Get map shard.
+       shard := m.GetShard(key)
+       shard.Lock()
+       shard.items[key] = value
+       shard.Unlock()
+}
+
+// Callback to return new element to be inserted into the map
+// It is called while lock is held, therefore it MUST NOT
+// try to access other keys in same map, as it can lead to deadlock since
+// Go sync.RWLock is not reentrant
+type UpsertCb func(exist bool, valueInMap interface{}, newValue interface{}) 
interface{}
+
+// Insert or Update - updates existing element or inserts a new one using 
UpsertCb
+func (m *ConcurrentMap) Upsert(key string, value interface{}, cb UpsertCb) 
(res interface{}) {
+       shard := m.GetShard(key)
+       shard.Lock()
+       v, ok := shard.items[key]
+       res = cb(ok, v, value)
+       shard.items[key] = res
+       shard.Unlock()
+       return res
+}
+
+// Sets the given value under the specified key if no value was associated 
with it.
+func (m *ConcurrentMap) SetIfAbsent(key string, value interface{}) bool {
+       // Get map shard.
+       shard := m.GetShard(key)
+       shard.Lock()
+       _, ok := shard.items[key]
+       if !ok {
+               shard.items[key] = value
+       }
+       shard.Unlock()
+       return !ok
+}
+
+// Retrieves an element from map under given key.
+func (m ConcurrentMap) Get(key string) (interface{}, bool) {
+       // Get shard
+       shard := m.GetShard(key)
+       shard.RLock()
+       // Get item from shard.
+       val, ok := shard.items[key]
+       shard.RUnlock()
+       return val, ok
+}
+
+// Returns the number of elements within the map.
+func (m ConcurrentMap) Count() int {
+       count := 0
+       for i := 0; i < SHARD_COUNT; i++ {
+               shard := m[i]
+               shard.RLock()
+               count += len(shard.items)
+               shard.RUnlock()
+       }
+       return count
+}
+
+// Looks up an item under specified key
+func (m *ConcurrentMap) Has(key string) bool {
+       // Get shard
+       shard := m.GetShard(key)
+       shard.RLock()
+       // See if element is within shard.
+       _, ok := shard.items[key]
+       shard.RUnlock()
+       return ok
+}
+
+// Removes an element from the map.
+func (m *ConcurrentMap) Remove(key string) {
+       // Try to get shard.
+       shard := m.GetShard(key)
+       shard.Lock()
+       delete(shard.items, key)
+       shard.Unlock()
+}
+
+// Removes an element from the map and returns it
+func (m *ConcurrentMap) Pop(key string) (v interface{}, exists bool) {
+       // Try to get shard.
+       shard := m.GetShard(key)
+       shard.Lock()
+       v, exists = shard.items[key]
+       delete(shard.items, key)
+       shard.Unlock()
+       return v, exists
+}
+
+// Checks if map is empty.
+func (m *ConcurrentMap) IsEmpty() bool {
+       return m.Count() == 0
+}
+
+// Used by the Iter & IterBuffered functions to wrap two variables together 
over a channel,
+type Tuple struct {
+       Key string
+       Val interface{}
+}
+
+// Returns an iterator which could be used in a for range loop.
+//
+// Deprecated: using IterBuffered() will get a better performence
+func (m ConcurrentMap) Iter() <-chan Tuple {
+       ch := make(chan Tuple)
+       go func() {
+               wg := sync.WaitGroup{}
+               wg.Add(SHARD_COUNT)
+               // Foreach shard.
+               for _, shard := range m {
+                       go func(shard *concurrentMapShared) {
+                               // Foreach key, value pair.
+                               shard.RLock()
+                               for key, val := range shard.items {
+                                       ch <- Tuple{key, val}
+                               }
+                               shard.RUnlock()
+                               wg.Done()
+                       }(shard)
+               }
+               wg.Wait()
+               close(ch)
+       }()
+       return ch
+}
+
+// Returns a buffered iterator which could be used in a for range loop.
+func (m ConcurrentMap) IterBuffered() <-chan Tuple {
+       ch := make(chan Tuple, m.Count())
+       go func() {
+               wg := sync.WaitGroup{}
+               wg.Add(SHARD_COUNT)
+               // Foreach shard.
+               for _, shard := range m {
+                       go func(shard *concurrentMapShared) {
+                               // Foreach key, value pair.
+                               shard.RLock()
+                               for key, val := range shard.items {
+                                       ch <- Tuple{key, val}
+                               }
+                               shard.RUnlock()
+                               wg.Done()
+                       }(shard)
+               }
+               wg.Wait()
+               close(ch)
+       }()
+       return ch
+}
+
+// Returns all items as map[string]interface{}
+func (m ConcurrentMap) Items() map[string]interface{} {
+       tmp := make(map[string]interface{})
+
+       // Insert items to temporary map.
+       for item := range m.IterBuffered() {
+               tmp[item.Key] = item.Val
+       }
+
+       return tmp
+}
+
+// Iterator callback,called for every key,value found in
+// maps. RLock is held for all calls for a given shard
+// therefore callback sess consistent view of a shard,
+// but not across the shards
+type IterCb func(key string, v interface{})
+
+// Callback based iterator, cheapest way to read
+// all elements in a map.
+func (m *ConcurrentMap) IterCb(fn IterCb) {
+       for idx := range *m {
+               shard := (*m)[idx]
+               shard.RLock()
+               for key, value := range shard.items {
+                       fn(key, value)
+               }
+               shard.RUnlock()
+       }
+}
+
+// Return all keys as []string
+func (m ConcurrentMap) Keys() []string {
+       count := m.Count()
+       ch := make(chan string, count)
+       go func() {
+               // Foreach shard.
+               wg := sync.WaitGroup{}
+               wg.Add(SHARD_COUNT)
+               for _, shard := range m {
+                       go func(shard *concurrentMapShared) {
+                               // Foreach key, value pair.
+                               shard.RLock()
+                               for key := range shard.items {
+                                       ch <- key
+                               }
+                               shard.RUnlock()
+                               wg.Done()
+                       }(shard)
+               }
+               wg.Wait()
+               close(ch)
+       }()
+
+       keys := make([]string, 0, count)
+       for k := range ch {
+               keys = append(keys, k)
+       }
+       return keys
+}
+
+//Reviles ConcurrentMap "private" variables to json marshal.
+func (m ConcurrentMap) MarshalJSON() ([]byte, error) {
+       // Create a temporary map, which will hold all item spread across 
shards.
+       tmp := make(map[string]interface{})
+
+       // Insert items to temporary map.
+       for item := range m.IterBuffered() {
+               tmp[item.Key] = item.Val
+       }
+       return json.Marshal(tmp)
+}
+
+func fnv32(key string) uint32 {
+       hash := uint32(2166136261)
+       const prime32 = uint32(16777619)
+       for i := 0; i < len(key); i++ {
+               hash *= prime32
+               hash ^= uint32(key[i])
+       }
+       return hash
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/util/ip.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/util/ip.go b/rocketmq-go/util/ip.go
new file mode 100644
index 0000000..f87000d
--- /dev/null
+++ b/rocketmq-go/util/ip.go
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package util
+
+import (
+       "net"
+       "strings"
+)
+
+func GetIp4Bytes() (ret []byte) {
+       ip := getIp()
+       ret = ip[len(ip)-4:]
+       return
+}
+
+func GetLocalIp4() (ip4 string) {
+       ip := getIp()
+       if ip.To4() != nil {
+               currIp := ip.String()
+               if !strings.Contains(currIp, ":") && currIp != "127.0.0.1" && 
isIntranetIpv4(currIp) {
+                       ip4 = currIp
+               }
+       }
+       return
+}
+func getIp() (ip net.IP) {
+       interfaces, err := net.Interfaces()
+       if err != nil {
+               return
+       }
+
+       for _, face := range interfaces {
+               if strings.Contains(face.Name, "lo") {
+                       continue
+               }
+               addrs, err := face.Addrs()
+               if err != nil {
+                       return
+               }
+
+               for _, addr := range addrs {
+                       if ipnet, ok := addr.(*net.IPNet); ok && 
!ipnet.IP.IsLoopback() {
+                               if ipnet.IP.To4() != nil {
+                                       currIp := ipnet.IP.String()
+                                       if !strings.Contains(currIp, ":") && 
currIp != "127.0.0.1" && isIntranetIpv4(currIp) {
+                                               ip = ipnet.IP
+                                       }
+                               }
+                       }
+               }
+       }
+       return
+}
+func isIntranetIpv4(ip string) bool {
+       if strings.HasPrefix(ip, "192.168.") || strings.HasPrefix(ip, 
"169.254.") {
+               return true
+       }
+       return false
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/util/json_util.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/util/json_util.go b/rocketmq-go/util/json_util.go
new file mode 100644
index 0000000..91fdc5d
--- /dev/null
+++ b/rocketmq-go/util/json_util.go
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package util
+
+import (
+       "errors"
+)
+
+const (
+       STRING = "STRING"
+       NUMBER = "NUMBER"
+
+       START_OBJ = "START_OBJ" //{
+       END_OBJ   = "END_OBJ"   //}
+       COMMA     = "COMMA"     //,
+       COLON     = "COLON"     //:
+
+       //// may be next version impl it
+       //BOOL
+       //NULL
+       //START_ARRAY //[
+       //END_ARRAY //]
+       //EOF
+)
+
+type Token struct {
+       tokenType  string
+       tokenValue string
+}
+
+////{"offsetTable":{{"brokerName":"broker-b","queueId":122222,"topic":"GoLang"}:9420,{"brokerName":"broker-b","queueId":2,"topic":"GoLang"}:9184,{"brokerName":"broker-b","queueId":1,"topic":"GoLang"}:9260,{"brokerName":"broker-b","queueId":3,"topic":"GoLang"}:9139}}
+
+func GetKvStringMap(str string) (kvMap map[string]string, err error) {
+       var tokenList []Token
+       tokenList, err = parseTokenList(str)
+       kvMap = map[string]string{}
+       startObjCount := 0
+       readType := 0 // 0 begin 1 key 2 value
+       nowKey := ""
+       nowValue := ""
+       for i := 0; i < len(tokenList); i++ {
+               nowToken := tokenList[i]
+               if nowToken.tokenType == START_OBJ {
+                       startObjCount++
+               }
+               if nowToken.tokenType == END_OBJ {
+                       startObjCount--
+               }
+               if readType == 0 {
+                       if nowToken.tokenType != START_OBJ {
+                               err = errors.New("json not start with {")
+                               return
+                       }
+                       readType = 1
+               } else if readType == 1 {
+                       if nowToken.tokenType == COLON { //: split k and v
+                               if startObjCount == 1 {
+                                       readType = 2
+                                       continue
+                               }
+                       }
+                       if nowToken.tokenType == STRING {
+                               nowKey = nowKey + "\"" + nowToken.tokenValue + 
"\""
+                       } else {
+                               nowKey = nowKey + nowToken.tokenValue
+                       }
+               } else if readType == 2 {
+                       if nowToken.tokenType == COMMA { // , split kv pair
+                               if startObjCount == 1 {
+                                       kvMap[nowKey] = nowValue
+                                       nowKey = ""
+                                       nowValue = ""
+                                       readType = 1
+                                       continue
+                               }
+                       }
+                       if nowToken.tokenType == STRING {
+                               nowValue = nowValue + "\"" + 
nowToken.tokenValue + "\""
+
+                       } else {
+                               if startObjCount > 0 { //use less end }
+                                       nowValue = nowValue + 
nowToken.tokenValue
+                               }
+                       }
+
+               } else {
+                       err = errors.New("this is a bug")
+                       return
+               }
+       }
+       if len(nowKey) > 0 {
+
+               kvMap[nowKey] = nowValue
+       }
+       return
+}
+
+func parseTokenList(str string) (tokenList []Token, err error) {
+
+       for i := 0; i < len(str); i++ {
+               c := str[i]
+               token := Token{}
+               switch c {
+               case '{':
+                       token.tokenType = START_OBJ
+                       token.tokenValue = string(c)
+                       break
+               case '}':
+                       token.tokenType = END_OBJ
+                       token.tokenValue = string(c)
+                       break
+               case ',':
+                       token.tokenType = COMMA
+                       token.tokenValue = string(c)
+                       break
+               case ':':
+                       token.tokenType = COLON
+                       token.tokenValue = string(c)
+                       break
+               case '"':
+                       token.tokenType = STRING
+                       token.tokenValue = ""
+                       for i++; str[i] != '"'; i++ {
+                               token.tokenValue = token.tokenValue + 
string(str[i])
+                       }
+                       break
+               default:
+                       if c == '-' || (str[i] <= '9' && str[i] >= '0') {
+                               token.tokenType = NUMBER
+                               token.tokenValue = string(c)
+                               for i++; str[i] <= '9' && str[i] >= '0'; i++ {
+                                       token.tokenValue = token.tokenValue + 
string(str[i])
+                               }
+                               i--
+                               break
+                       }
+                       err = errors.New("INVALID JSON")
+                       return
+               }
+               tokenList = append(tokenList, token)
+       }
+       return
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/util/message_client_id_generator.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/util/message_client_id_generator.go 
b/rocketmq-go/util/message_client_id_generator.go
new file mode 100644
index 0000000..df4cfb6
--- /dev/null
+++ b/rocketmq-go/util/message_client_id_generator.go
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package util
+
+import (
+       "bytes"
+       "encoding/binary"
+       "os"
+       "strconv"
+       "strings"
+       "sync"
+       "time"
+)
+
+var (
+       counter       int16 = 0
+       startTime     int64 //this month's first day 12 hour. for example. 
2017-01-01 12:00:00
+       nextStartTime int64 //next month's first day 12 hour. for example. 
2017-02-01 12:00:00
+       idPrefix      string
+       lock          sync.Mutex
+)
+
+//MessageClientId = ip  + pid + classloaderid + counter + time
+//4 bytes for ip ,
+//2 bytes for pid,
+//4 bytes for  classloaderid(for java,go put 0)
+
+//2 bytes for counter,
+//4 bytes for timediff, //(time.Now().UnixNano() - startTime) / 1000000) 
divide 1000000 because golang is different with java
+func GeneratorMessageClientId() (uniqMessageId string) {
+       defer lock.Unlock()
+       lock.Lock()
+       if len(idPrefix) == 0 {
+               idPrefix = generatorMessageClientIdPrefix()
+       }
+       if time.Now().UnixNano() > nextStartTime {
+               startTime, nextStartTime = getStartAndNextStartTime()
+       }
+       counter = counter + 1
+       var buf2 = bytes.NewBuffer([]byte{})
+       binary.Write(buf2, binary.BigEndian, 
int32((time.Now().UnixNano()-startTime)/1000000))
+       binary.Write(buf2, binary.BigEndian, counter)
+       uniqMessageId = idPrefix + bytes2string(buf2.Bytes())
+       return
+}
+
+func GeneratorMessageOffsetId(storeHost []byte, port int32, commitOffset 
int64) (messageOffsetId string) {
+       var buf = bytes.NewBuffer([]byte{})
+       binary.Write(buf, binary.BigEndian, storeHost)
+       binary.Write(buf, binary.BigEndian, port)
+       binary.Write(buf, binary.BigEndian, commitOffset)
+       idPrefix := buf.Bytes()
+       messageOffsetId = bytes2string(idPrefix)
+       return
+}
+func generatorMessageClientIdPrefix() (messageClientIdPrefix string) {
+       var (
+               idPrefix      []byte
+               ip4Bytes      []byte
+               pid           int16
+               classloaderId int32 = -1 // golang don't have this
+       )
+       ip4Bytes = GetIp4Bytes()
+       pid = int16(os.Getpid())
+       var buf = bytes.NewBuffer([]byte{})
+       binary.Write(buf, binary.BigEndian, ip4Bytes)
+       binary.Write(buf, binary.BigEndian, pid)
+       binary.Write(buf, binary.BigEndian, classloaderId)
+       idPrefix = buf.Bytes()
+       messageClientIdPrefix = bytes2string(idPrefix)
+       return
+}
+func getStartAndNextStartTime() (thisMonthFirstDay12 int64, 
nextMonthFirstDay12 int64) {
+       now := time.Now()
+       year := now.Year()
+       month := now.Month()
+       thisMonthFirstDay12 = time.Date(year, month, 1, 12, 0, 0, 0, 
time.Local).UnixNano()
+       month = month + 1
+       if month > 12 {
+               month = month - 12
+               year = year + 1
+       }
+       nextMonthFirstDay12 = time.Date(year, month, 1, 12, 0, 0, 0, 
time.Local).UnixNano()
+       return
+}
+func bytes2string(bytes []byte) (ret string) {
+       for _, oneByte := range bytes {
+               hexStr := strconv.FormatInt(int64(oneByte), 16)
+               if len(hexStr) < 2 {
+                       hexStr = "0" + hexStr
+               }
+               ret = ret + hexStr
+       }
+       ret = strings.ToUpper(ret)
+       return
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/util/string_util.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/util/string_util.go b/rocketmq-go/util/string_util.go
new file mode 100644
index 0000000..7e31e00
--- /dev/null
+++ b/rocketmq-go/util/string_util.go
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package util
+
+import "strconv"
+
+func StrToIntWithDefaultValue(str string, defaultValue int) (result int) {
+       ret, err := strconv.Atoi(str)
+       if err != nil {
+               result = defaultValue
+       } else {
+               result = ret
+       }
+       return
+}
+func ReadString(obj interface{}) (ret string) {
+       if obj == nil {
+               ret = ""
+       } else {
+               ret = obj.(string)
+       }
+       return
+}
+
+func IntToString(intValue int) (ret string) {
+       ret = strconv.Itoa(intValue)
+       return
+}
+
+func StrToInt(str string) (result int, err error) {
+       result, err = strconv.Atoi(str)
+       return
+}
+func StrToInt32(str string) (result int32, err error) {
+       var ret int64
+       ret, err = strconv.ParseInt(str, 10, 32)
+       result = int32(ret)
+       return
+}
+func StrToInt16(str string) (result int16, err error) {
+       var ret int64
+       ret, err = strconv.ParseInt(str, 10, 16)
+       result = int16(ret)
+       return
+}
+func StrToInt64(str string) (result int64, err error) {
+       result, err = strconv.ParseInt(str, 10, 64)
+       return
+}
+
+func StrToInt32WithDefaultValue(str string, defaultValue int32) (result int32) 
{
+       ret, err := StrToInt32(str)
+       if err != nil {
+               result = defaultValue
+       } else {
+               result = ret
+       }
+       return
+}
+func StrToInt16WithDefaultValue(str string, defaultValue int16) (result int16) 
{
+       ret, err := StrToInt16(str)
+       if err != nil {
+               result = defaultValue
+       } else {
+               result = ret
+       }
+       return
+}
+func StrToInt64WithDefaultValue(str string, defaultValue int64) (result int64) 
{
+       ret, err := StrToInt64(str)
+       if err != nil {
+               result = defaultValue
+       } else {
+               result = ret
+       }
+       return
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/util/structs/field.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/util/structs/field.go 
b/rocketmq-go/util/structs/field.go
new file mode 100644
index 0000000..e697832
--- /dev/null
+++ b/rocketmq-go/util/structs/field.go
@@ -0,0 +1,141 @@
+package structs
+
+import (
+       "errors"
+       "fmt"
+       "reflect"
+)
+
+var (
+       errNotExported = errors.New("field is not exported")
+       errNotSettable = errors.New("field is not settable")
+)
+
+// Field represents a single struct field that encapsulates high level
+// functions around the field.
+type Field struct {
+       value      reflect.Value
+       field      reflect.StructField
+       defaultTag string
+}
+
+// Tag returns the value associated with key in the tag string. If there is no
+// such key in the tag, Tag returns the empty string.
+func (f *Field) Tag(key string) string {
+       return f.field.Tag.Get(key)
+}
+
+// Value returns the underlying value of the field. It panics if the field
+// is not exported.
+func (f *Field) Value() interface{} {
+       return f.value.Interface()
+}
+
+// IsEmbedded returns true if the given field is an anonymous field (embedded)
+func (f *Field) IsEmbedded() bool {
+       return f.field.Anonymous
+}
+
+// IsExported returns true if the given field is exported.
+func (f *Field) IsExported() bool {
+       return f.field.PkgPath == ""
+}
+
+// IsZero returns true if the given field is not initialized (has a zero 
value).
+// It panics if the field is not exported.
+func (f *Field) IsZero() bool {
+       zero := reflect.Zero(f.value.Type()).Interface()
+       current := f.Value()
+
+       return reflect.DeepEqual(current, zero)
+}
+
+// Name returns the name of the given field
+func (f *Field) Name() string {
+       return f.field.Name
+}
+
+// Kind returns the fields kind, such as "string", "map", "bool", etc ..
+func (f *Field) Kind() reflect.Kind {
+       return f.value.Kind()
+}
+
+// Set sets the field to given value v. It returns an error if the field is not
+// settable (not addressable or not exported) or if the given value's type
+// doesn't match the fields type.
+func (f *Field) Set(val interface{}) error {
+       // we can't set unexported fields, so be sure this field is exported
+       if !f.IsExported() {
+               return errNotExported
+       }
+
+       // do we get here? not sure...
+       if !f.value.CanSet() {
+               return errNotSettable
+       }
+
+       given := reflect.ValueOf(val)
+
+       if f.value.Kind() != given.Kind() {
+               return fmt.Errorf("wrong kind. got: %s want: %s", given.Kind(), 
f.value.Kind())
+       }
+
+       f.value.Set(given)
+       return nil
+}
+
+// Zero sets the field to its zero value. It returns an error if the field is 
not
+// settable (not addressable or not exported).
+func (f *Field) Zero() error {
+       zero := reflect.Zero(f.value.Type()).Interface()
+       return f.Set(zero)
+}
+
+// Fields returns a slice of Fields. This is particular handy to get the fields
+// of a nested struct . A struct tag with the content of "-" ignores the
+// checking of that particular field. Example:
+//
+//   // Field is ignored by this package.
+//   Field *http.Request `structs:"-"`
+//
+// It panics if field is not exported or if field's kind is not struct
+func (f *Field) Fields() []*Field {
+       return getFields(f.value, f.defaultTag)
+}
+
+// Field returns the field from a nested struct. It panics if the nested struct
+// is not exported or if the field was not found.
+func (f *Field) Field(name string) *Field {
+       field, ok := f.FieldOk(name)
+       if !ok {
+               panic("field not found")
+       }
+
+       return field
+}
+
+// FieldOk returns the field from a nested struct. The boolean returns whether
+// the field was found (true) or not (false).
+func (f *Field) FieldOk(name string) (*Field, bool) {
+       value := &f.value
+       // value must be settable so we need to make sure it holds the address 
of the
+       // variable and not a copy, so we can pass the pointer to strctVal 
instead of a
+       // copy (which is not assigned to any variable, hence not settable).
+       // see "https://blog.golang.org/laws-of-reflection#TOC_8.";
+       if f.value.Kind() != reflect.Ptr {
+               a := f.value.Addr()
+               value = &a
+       }
+       v := strctVal(value.Interface())
+       t := v.Type()
+
+       field, ok := t.FieldByName(name)
+       if !ok {
+               return nil, false
+       }
+
+       return &Field{
+               field: field,
+               value: v.FieldByName(name),
+       }, true
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/util/structs/structs.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/util/structs/structs.go 
b/rocketmq-go/util/structs/structs.go
new file mode 100644
index 0000000..87b7970
--- /dev/null
+++ b/rocketmq-go/util/structs/structs.go
@@ -0,0 +1,581 @@
+// Package structs contains various utilities functions to work with structs.
+package structs
+
+import (
+       "fmt"
+
+       "reflect"
+       "strings"
+)
+
+var (
+       // DefaultTagName is the default tag name for struct fields which 
provides
+       // a more granular to tweak certain structs. Lookup the necessary 
functions
+       // for more info.
+       DefaultTagName = "structs" // struct's field default tag name
+)
+
+// Struct encapsulates a struct type to provide several high level functions
+// around the struct.
+type Struct struct {
+       raw     interface{}
+       value   reflect.Value
+       TagName string
+}
+
+// New returns a new *Struct with the struct s. It panics if the s's kind is
+// not struct.
+func New(s interface{}) *Struct {
+       return &Struct{
+               raw:     s,
+               value:   strctVal(s),
+               TagName: DefaultTagName,
+       }
+}
+
+// Map converts the given struct to a map[string]interface{}, where the keys
+// of the map are the field names and the values of the map the associated
+// values of the fields. The default key string is the struct field name but
+// can be changed in the struct field's tag value. The "structs" key in the
+// struct's field tag value is the key name. Example:
+//
+//   // Field appears in map as key "myName".
+//   Name string `structs:"myName"`
+//
+// A tag value with the content of "-" ignores that particular field. Example:
+//
+//   // Field is ignored by this package.
+//   Field bool `structs:"-"`
+//
+// A tag value with the content of "string" uses the stringer to get the 
value. Example:
+//
+//   // The value will be output of Animal's String() func.
+//   // Map will panic if Animal does not implement String().
+//   Field *Animal `structs:"field,string"`
+//
+// A tag value with the option of "flatten" used in a struct field is to 
flatten its fields
+// in the output map. Example:
+//
+//   // The FieldStruct's fields will be flattened into the output map.
+//   FieldStruct time.Time `structs:",flatten"`
+//
+// A tag value with the option of "omitnested" stops iterating further if the 
type
+// is a struct. Example:
+//
+//   // Field is not processed further by this package.
+//   Field time.Time     `structs:"myName,omitnested"`
+//   Field *http.Request `structs:",omitnested"`
+//
+// A tag value with the option of "omitempty" ignores that particular field if
+// the field value is empty. Example:
+//
+//   // Field appears in map as key "myName", but the field is
+//   // skipped if empty.
+//   Field string `structs:"myName,omitempty"`
+//
+//   // Field appears in map as key "Field" (the default), but
+//   // the field is skipped if empty.
+//   Field string `structs:",omitempty"`
+//
+// Note that only exported fields of a struct can be accessed, non exported
+// fields will be neglected.
+func (s *Struct) Map() map[string]interface{} {
+       out := make(map[string]interface{})
+       s.FillMap(out)
+       return out
+}
+
+// FillMap is the same as Map. Instead of returning the output, it fills the
+// given map.
+func (s *Struct) FillMap(out map[string]interface{}) {
+       if out == nil {
+               return
+       }
+
+       fields := s.structFields()
+
+       for _, field := range fields {
+               name := field.Name
+               smallName := strings.Replace(name, string(name[0]), 
string(strings.ToLower(string(name[0]))), 1) //todo
+               val := s.value.FieldByName(name)
+               isSubStruct := false
+               var finalVal interface{}
+
+               tagName, tagOpts := parseTag(field.Tag.Get(s.TagName))
+               if tagName != "" {
+                       name = tagName
+               }
+
+               // if the value is a zero value and the field is marked as 
omitempty do
+               // not include
+               if tagOpts.Has("omitempty") {
+                       zero := reflect.Zero(val.Type()).Interface()
+                       current := val.Interface()
+
+                       if reflect.DeepEqual(current, zero) {
+                               continue
+                       }
+               }
+
+               if !tagOpts.Has("omitnested") {
+                       finalVal = s.nested(val)
+
+                       v := reflect.ValueOf(val.Interface())
+                       if v.Kind() == reflect.Ptr {
+                               v = v.Elem()
+                       }
+
+                       switch v.Kind() {
+                       case reflect.Map, reflect.Struct:
+                               isSubStruct = true
+                       }
+               } else {
+                       finalVal = val.Interface()
+               }
+
+               if tagOpts.Has("string") {
+                       s, ok := val.Interface().(fmt.Stringer)
+                       if ok {
+                               out[smallName] = s.String()
+                       }
+                       continue
+               }
+
+               if isSubStruct && (tagOpts.Has("flatten")) {
+                       for k := range finalVal.(map[string]interface{}) {
+                               out[k] = finalVal.(map[string]interface{})[k]
+                       }
+               } else {
+                       out[smallName] = finalVal
+               }
+       }
+}
+
+// Values converts the given s struct's field values to a []interface{}.  A
+// struct tag with the content of "-" ignores the that particular field.
+// Example:
+//
+//   // Field is ignored by this package.
+//   Field int `structs:"-"`
+//
+// A value with the option of "omitnested" stops iterating further if the type
+// is a struct. Example:
+//
+//   // Fields is not processed further by this package.
+//   Field time.Time     `structs:",omitnested"`
+//   Field *http.Request `structs:",omitnested"`
+//
+// A tag value with the option of "omitempty" ignores that particular field and
+// is not added to the values if the field value is empty. Example:
+//
+//   // Field is skipped if empty
+//   Field string `structs:",omitempty"`
+//
+// Note that only exported fields of a struct can be accessed, non exported
+// fields  will be neglected.
+func (s *Struct) Values() []interface{} {
+       fields := s.structFields()
+
+       var t []interface{}
+
+       for _, field := range fields {
+               val := s.value.FieldByName(field.Name)
+
+               _, tagOpts := parseTag(field.Tag.Get(s.TagName))
+
+               // if the value is a zero value and the field is marked as 
omitempty do
+               // not include
+               if tagOpts.Has("omitempty") {
+                       zero := reflect.Zero(val.Type()).Interface()
+                       current := val.Interface()
+
+                       if reflect.DeepEqual(current, zero) {
+                               continue
+                       }
+               }
+
+               if tagOpts.Has("string") {
+                       s, ok := val.Interface().(fmt.Stringer)
+                       if ok {
+                               t = append(t, s.String())
+                       }
+                       continue
+               }
+
+               if IsStruct(val.Interface()) && !tagOpts.Has("omitnested") {
+                       // look out for embedded structs, and convert them to a
+                       // []interface{} to be added to the final values slice
+                       for _, embeddedVal := range Values(val.Interface()) {
+                               t = append(t, embeddedVal)
+                       }
+               } else {
+                       t = append(t, val.Interface())
+               }
+       }
+
+       return t
+}
+
+// Fields returns a slice of Fields. A struct tag with the content of "-"
+// ignores the checking of that particular field. Example:
+//
+//   // Field is ignored by this package.
+//   Field bool `structs:"-"`
+//
+// It panics if s's kind is not struct.
+func (s *Struct) Fields() []*Field {
+       return getFields(s.value, s.TagName)
+}
+
+// Names returns a slice of field names. A struct tag with the content of "-"
+// ignores the checking of that particular field. Example:
+//
+//   // Field is ignored by this package.
+//   Field bool `structs:"-"`
+//
+// It panics if s's kind is not struct.
+func (s *Struct) Names() []string {
+       fields := getFields(s.value, s.TagName)
+
+       names := make([]string, len(fields))
+
+       for i, field := range fields {
+               names[i] = field.Name()
+       }
+
+       return names
+}
+
+func getFields(v reflect.Value, tagName string) []*Field {
+       if v.Kind() == reflect.Ptr {
+               v = v.Elem()
+       }
+
+       t := v.Type()
+
+       var fields []*Field
+
+       for i := 0; i < t.NumField(); i++ {
+               field := t.Field(i)
+
+               if tag := field.Tag.Get(tagName); tag == "-" {
+                       continue
+               }
+
+               f := &Field{
+                       field: field,
+                       value: v.FieldByName(field.Name),
+               }
+
+               fields = append(fields, f)
+
+       }
+
+       return fields
+}
+
+// Field returns a new Field struct that provides several high level functions
+// around a single struct field entity. It panics if the field is not found.
+func (s *Struct) Field(name string) *Field {
+       f, ok := s.FieldOk(name)
+       if !ok {
+               panic("field not found")
+       }
+
+       return f
+}
+
+// FieldOk returns a new Field struct that provides several high level 
functions
+// around a single struct field entity. The boolean returns true if the field
+// was found.
+func (s *Struct) FieldOk(name string) (*Field, bool) {
+       t := s.value.Type()
+
+       field, ok := t.FieldByName(name)
+       if !ok {
+               return nil, false
+       }
+
+       return &Field{
+               field:      field,
+               value:      s.value.FieldByName(name),
+               defaultTag: s.TagName,
+       }, true
+}
+
+// IsZero returns true if all fields in a struct is a zero value (not
+// initialized) A struct tag with the content of "-" ignores the checking of
+// that particular field. Example:
+//
+//   // Field is ignored by this package.
+//   Field bool `structs:"-"`
+//
+// A value with the option of "omitnested" stops iterating further if the type
+// is a struct. Example:
+//
+//   // Field is not processed further by this package.
+//   Field time.Time     `structs:"myName,omitnested"`
+//   Field *http.Request `structs:",omitnested"`
+//
+// Note that only exported fields of a struct can be accessed, non exported
+// fields  will be neglected. It panics if s's kind is not struct.
+func (s *Struct) IsZero() bool {
+       fields := s.structFields()
+
+       for _, field := range fields {
+               val := s.value.FieldByName(field.Name)
+
+               _, tagOpts := parseTag(field.Tag.Get(s.TagName))
+
+               if IsStruct(val.Interface()) && !tagOpts.Has("omitnested") {
+                       ok := IsZero(val.Interface())
+                       if !ok {
+                               return false
+                       }
+
+                       continue
+               }
+
+               // zero value of the given field, such as "" for string, 0 for 
int
+               zero := reflect.Zero(val.Type()).Interface()
+
+               //  current value of the given field
+               current := val.Interface()
+
+               if !reflect.DeepEqual(current, zero) {
+                       return false
+               }
+       }
+
+       return true
+}
+
+// HasZero returns true if a field in a struct is not initialized (zero value).
+// A struct tag with the content of "-" ignores the checking of that particular
+// field. Example:
+//
+//   // Field is ignored by this package.
+//   Field bool `structs:"-"`
+//
+// A value with the option of "omitnested" stops iterating further if the type
+// is a struct. Example:
+//
+//   // Field is not processed further by this package.
+//   Field time.Time     `structs:"myName,omitnested"`
+//   Field *http.Request `structs:",omitnested"`
+//
+// Note that only exported fields of a struct can be accessed, non exported
+// fields  will be neglected. It panics if s's kind is not struct.
+func (s *Struct) HasZero() bool {
+       fields := s.structFields()
+
+       for _, field := range fields {
+               val := s.value.FieldByName(field.Name)
+
+               _, tagOpts := parseTag(field.Tag.Get(s.TagName))
+
+               if IsStruct(val.Interface()) && !tagOpts.Has("omitnested") {
+                       ok := HasZero(val.Interface())
+                       if ok {
+                               return true
+                       }
+
+                       continue
+               }
+
+               // zero value of the given field, such as "" for string, 0 for 
int
+               zero := reflect.Zero(val.Type()).Interface()
+
+               //  current value of the given field
+               current := val.Interface()
+
+               if reflect.DeepEqual(current, zero) {
+                       return true
+               }
+       }
+
+       return false
+}
+
+// Name returns the structs's type name within its package. For more info refer
+// to Name() function.
+func (s *Struct) Name() string {
+       return s.value.Type().Name()
+}
+
+// structFields returns the exported struct fields for a given s struct. This
+// is a convenient helper method to avoid duplicate code in some of the
+// functions.
+func (s *Struct) structFields() []reflect.StructField {
+       t := s.value.Type()
+
+       var f []reflect.StructField
+
+       for i := 0; i < t.NumField(); i++ {
+               field := t.Field(i)
+               // we can't access the value of unexported fields
+               if field.PkgPath != "" {
+                       continue
+               }
+
+               // don't check if it's omitted
+               if tag := field.Tag.Get(s.TagName); tag == "-" {
+                       continue
+               }
+
+               f = append(f, field)
+       }
+
+       return f
+}
+
+func strctVal(s interface{}) reflect.Value {
+       v := reflect.ValueOf(s)
+
+       // if pointer get the underlying element≤
+       for v.Kind() == reflect.Ptr {
+               v = v.Elem()
+       }
+
+       if v.Kind() != reflect.Struct {
+               panic("not struct")
+       }
+
+       return v
+}
+
+// Map converts the given struct to a map[string]interface{}. For more info
+// refer to Struct types Map() method. It panics if s's kind is not struct.
+func Map(s interface{}) map[string]interface{} {
+       return New(s).Map()
+}
+
+// FillMap is the same as Map. Instead of returning the output, it fills the
+// given map.
+func FillMap(s interface{}, out map[string]interface{}) {
+       New(s).FillMap(out)
+}
+
+// Values converts the given struct to a []interface{}. For more info refer to
+// Struct types Values() method.  It panics if s's kind is not struct.
+func Values(s interface{}) []interface{} {
+       return New(s).Values()
+}
+
+// Fields returns a slice of *Field. For more info refer to Struct types
+// Fields() method.  It panics if s's kind is not struct.
+func Fields(s interface{}) []*Field {
+       return New(s).Fields()
+}
+
+// Names returns a slice of field names. For more info refer to Struct types
+// Names() method.  It panics if s's kind is not struct.
+func Names(s interface{}) []string {
+       return New(s).Names()
+}
+
+// IsZero returns true if all fields is equal to a zero value. For more info
+// refer to Struct types IsZero() method.  It panics if s's kind is not struct.
+func IsZero(s interface{}) bool {
+       return New(s).IsZero()
+}
+
+// HasZero returns true if any field is equal to a zero value. For more info
+// refer to Struct types HasZero() method.  It panics if s's kind is not 
struct.
+func HasZero(s interface{}) bool {
+       return New(s).HasZero()
+}
+
+// IsStruct returns true if the given variable is a struct or a pointer to
+// struct.
+func IsStruct(s interface{}) bool {
+       v := reflect.ValueOf(s)
+       if v.Kind() == reflect.Ptr {
+               v = v.Elem()
+       }
+
+       // uninitialized zero value of a struct
+       if v.Kind() == reflect.Invalid {
+               return false
+       }
+
+       return v.Kind() == reflect.Struct
+}
+
+// Name returns the structs's type name within its package. It returns an
+// empty string for unnamed types. It panics if s's kind is not struct.
+func Name(s interface{}) string {
+       return New(s).Name()
+}
+
+// nested retrieves recursively all types for the given value and returns the
+// nested value.
+func (s *Struct) nested(val reflect.Value) interface{} {
+       var finalVal interface{}
+
+       v := reflect.ValueOf(val.Interface())
+       if v.Kind() == reflect.Ptr {
+               v = v.Elem()
+       }
+
+       switch v.Kind() {
+       case reflect.Struct:
+               n := New(val.Interface())
+               n.TagName = s.TagName
+               m := n.Map()
+
+               // do not add the converted value if there are no exported 
fields, ie:
+               // time.Time
+               if len(m) == 0 {
+                       finalVal = val.Interface()
+               } else {
+                       finalVal = m
+               }
+       case reflect.Map:
+               v := val.Type().Elem()
+               if v.Kind() == reflect.Ptr {
+                       v = v.Elem()
+               }
+
+               // only iterate over struct types, ie: map[string]StructType,
+               // map[string][]StructType,
+               if v.Kind() == reflect.Struct ||
+                       (v.Kind() == reflect.Slice && v.Elem().Kind() == 
reflect.Struct) {
+                       m := make(map[string]interface{}, val.Len())
+                       for _, k := range val.MapKeys() {
+                               m[k.String()] = s.nested(val.MapIndex(k))
+                       }
+                       finalVal = m
+                       break
+               }
+
+               // TODO(arslan): should this be optional?
+               finalVal = val.Interface()
+       case reflect.Slice, reflect.Array:
+               if val.Type().Kind() == reflect.Interface {
+                       finalVal = val.Interface()
+                       break
+               }
+
+               // TODO(arslan): should this be optional?
+               // do not iterate of non struct types, just pass the value. Ie: 
[]int,
+               // []string, co... We only iterate further if it's a struct.
+               // i.e []foo or []*foo
+               if val.Type().Elem().Kind() != reflect.Struct &&
+                       !(val.Type().Elem().Kind() == reflect.Ptr &&
+                               val.Type().Elem().Elem().Kind() == 
reflect.Struct) {
+                       finalVal = val.Interface()
+                       break
+               }
+
+               slices := make([]interface{}, val.Len(), val.Len())
+               for x := 0; x < val.Len(); x++ {
+                       slices[x] = s.nested(val.Index(x))
+               }
+               finalVal = slices
+       default:
+               finalVal = val.Interface()
+       }
+
+       return finalVal
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/util/structs/tags.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/util/structs/tags.go b/rocketmq-go/util/structs/tags.go
new file mode 100644
index 0000000..8859341
--- /dev/null
+++ b/rocketmq-go/util/structs/tags.go
@@ -0,0 +1,32 @@
+package structs
+
+import "strings"
+
+// tagOptions contains a slice of tag options
+type tagOptions []string
+
+// Has returns true if the given optiton is available in tagOptions
+func (t tagOptions) Has(opt string) bool {
+       for _, tagOpt := range t {
+               if tagOpt == opt {
+                       return true
+               }
+       }
+
+       return false
+}
+
+// parseTag splits a struct field's tag into its name and a list of options
+// which comes after a name. A tag is in the form of: "name,option1,option2".
+// The name can be neglectected.
+func parseTag(tag string) (string, tagOptions) {
+       // tag is one of followings:
+       // ""
+       // "name"
+       // "name,opt"
+       // "name,opt,opt2"
+       // ",opt"
+
+       res := strings.Split(tag, ",")
+       return res[0], res[1:]
+}

Reply via email to