Repository: incubator-rocketmq-externals Updated Branches: refs/heads/master 7ceba1b23 -> aaa0758e6
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/service/mq_client.go ---------------------------------------------------------------------- diff --git a/rocketmq-go/service/mq_client.go b/rocketmq-go/service/mq_client.go index 0cf7df2..8bbfe79 100644 --- a/rocketmq-go/service/mq_client.go +++ b/rocketmq-go/service/mq_client.go @@ -16,5 +16,348 @@ */ package service +import ( + "encoding/json" + "errors" + "fmt" + "github.com/apache/incubator-rocketmq-externals/rocketmq-go/model" + "github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/config" + "github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/constant" + "github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/header" + "github.com/apache/incubator-rocketmq-externals/rocketmq-go/remoting" + "github.com/apache/incubator-rocketmq-externals/rocketmq-go/util" + "github.com/golang/glog" + "os" + "strconv" + "strings" + "time" +) + +//this struct is for something common,for example +//1.brokerInfo +//2.routerInfo +//3.subscribeInfo +//4.heartbeat type RocketMqClient interface { + GetClientId() (clientId string) + GetRemotingClient() (remotingClient *remoting.DefalutRemotingClient) + GetTopicSubscribeInfo(topic string) (messageQueueList []*model.MessageQueue) + GetPublishTopicList() []string + FetchMasterBrokerAddress(brokerName string) (masterAddress string) + EnqueuePullMessageRequest(pullRequest *model.PullRequest) + DequeuePullMessageRequest() (pullRequest *model.PullRequest) + FindBrokerAddressInSubscribe(brokerName string, brokerId int, onlyThisBroker bool) (brokerAddr string, slave bool, found bool) + TryToFindTopicPublishInfo(topic string) (topicPublicInfo *model.TopicPublishInfo, err error) + FindBrokerAddrByTopic(topic string) (addr string, ok bool) + UpdateTopicRouteInfoFromNameServer(topic string) (err error) + UpdateTopicRouteInfoFromNameServerUseDefaultTopic(topic string) (err error) + SendHeartbeatToAllBroker(heartBeatData *model.HeartbeatData) (err error) + ClearExpireResponse() + GetMaxOffset(mq *model.MessageQueue) int64 + SearchOffset(mq *model.MessageQueue, time time.Time) int64 +} + +var DEFAULT_TIMEOUT int64 = 6000 + +// common +type MqClientImpl struct { + ClientId string + remotingClient remoting.RemotingClient + TopicRouteTable util.ConcurrentMap // map[string]*model.TopicRouteData //topic | topicRoteData + BrokerAddrTable util.ConcurrentMap //map[string]map[int]string //brokerName | map[brokerId]address + TopicPublishInfoTable util.ConcurrentMap //map[string]*model.TopicPublishInfo //topic | TopicPublishInfo //all use this + TopicSubscribeInfoTable util.ConcurrentMap //map[string][]*model.MessageQueue //topic | MessageQueue + PullRequestQueue chan *model.PullRequest //todo move +} + +func MqClientInit(clientConfig *config.ClientConfig, clientRequestProcessor remoting.ClientRequestProcessor) (mqClientImpl *MqClientImpl) { + mqClientImpl = &MqClientImpl{} + mqClientImpl.ClientId = buildMqClientImplId() + mqClientImpl.TopicRouteTable = util.New() // make(map[string]*model.TopicRouteData) + mqClientImpl.BrokerAddrTable = util.New() //make(map[string]map[int]string) + mqClientImpl.remotingClient = remoting.RemotingClientInit(clientConfig, clientRequestProcessor) + mqClientImpl.TopicPublishInfoTable = util.New() //make(map[string]*model.TopicPublishInfo) + mqClientImpl.TopicSubscribeInfoTable = util.New() //make(map[string][]*model.MessageQueue) + mqClientImpl.PullRequestQueue = make(chan *model.PullRequest, 1024) + return +} +func (self *MqClientImpl) GetTopicSubscribeInfo(topic string) (messageQueueList []*model.MessageQueue) { + value, ok := self.TopicSubscribeInfoTable.Get(topic) + if ok { + messageQueueList = value.([]*model.MessageQueue) + } + return +} +func (self *MqClientImpl) GetMaxOffset(mq *model.MessageQueue) int64 { + brokerAddr := self.FetchMasterBrokerAddress(mq.BrokerName) + if len(brokerAddr) == 0 { + self.TryToFindTopicPublishInfo(mq.Topic) + brokerAddr = self.FetchMasterBrokerAddress(mq.BrokerName) + } + getMaxOffsetRequestHeader := &header.GetMaxOffsetRequestHeader{Topic: mq.Topic, QueueId: mq.QueueId} + remotingCmd := remoting.NewRemotingCommand(remoting.GET_MAX_OFFSET, getMaxOffsetRequestHeader) + response, err := self.remotingClient.InvokeSync(brokerAddr, remotingCmd, DEFAULT_TIMEOUT) + if err != nil { + return -1 + } + queryOffsetResponseHeader := header.QueryOffsetResponseHeader{} + queryOffsetResponseHeader.FromMap(response.ExtFields) + glog.Info("op=look max offset result", string(response.Body)) + return queryOffsetResponseHeader.Offset +} +func (self *MqClientImpl) SearchOffset(mq *model.MessageQueue, time time.Time) int64 { + brokerAddr := self.FetchMasterBrokerAddress(mq.BrokerName) + if len(brokerAddr) == 0 { + self.TryToFindTopicPublishInfo(mq.Topic) + brokerAddr = self.FetchMasterBrokerAddress(mq.BrokerName) + } + timeStamp := time.UnixNano() / 1000000 + searchOffsetRequestHeader := &header.SearchOffsetRequestHeader{Topic: mq.Topic, QueueId: mq.QueueId, Timestamp: timeStamp} + remotingCmd := remoting.NewRemotingCommand(remoting.SEARCH_OFFSET_BY_TIMESTAMP, searchOffsetRequestHeader) + response, err := self.remotingClient.InvokeSync(brokerAddr, remotingCmd, DEFAULT_TIMEOUT) + if err != nil { + return -1 + } + queryOffsetResponseHeader := header.QueryOffsetResponseHeader{} + + queryOffsetResponseHeader.FromMap(response.ExtFields) + glog.Info("op=look search offset result", string(response.Body)) + return queryOffsetResponseHeader.Offset +} +func (self *MqClientImpl) GetClientId() string { + return self.ClientId +} +func (self *MqClientImpl) GetPublishTopicList() []string { + var publishTopicList []string + for _, topic := range self.TopicPublishInfoTable.Keys() { + publishTopicList = append(publishTopicList, topic) + } + return publishTopicList +} +func (self *MqClientImpl) GetRemotingClient() remoting.RemotingClient { + return self.remotingClient +} + +func (self *MqClientImpl) EnqueuePullMessageRequest(pullRequest *model.PullRequest) { + self.PullRequestQueue <- pullRequest +} +func (self *MqClientImpl) DequeuePullMessageRequest() (pullRequest *model.PullRequest) { + pullRequest = <-self.PullRequestQueue + return +} + +func (self *MqClientImpl) ClearExpireResponse() { + //self.remotingClient.ClearExpireResponse() +} + +func (self *MqClientImpl) FetchMasterBrokerAddress(brokerName string) (masterAddress string) { + value, ok := self.BrokerAddrTable.Get(brokerName) + if ok { + masterAddress = value.(map[string]string)["0"] + } + return +} +func (self *MqClientImpl) TryToFindTopicPublishInfo(topic string) (topicPublicInfo *model.TopicPublishInfo, err error) { + value, ok := self.TopicPublishInfoTable.Get(topic) + if ok { + topicPublicInfo = value.(*model.TopicPublishInfo) + } + + if topicPublicInfo == nil || !topicPublicInfo.JudgeTopicPublishInfoOk() { + self.TopicPublishInfoTable.Set(topic, &model.TopicPublishInfo{HaveTopicRouterInfo: false}) + err = self.UpdateTopicRouteInfoFromNameServer(topic) + if err != nil { + glog.Warning(err) // if updateRouteInfo error, maybe we can use the defaultTopic + } + value, ok := self.TopicPublishInfoTable.Get(topic) + if ok { + topicPublicInfo = value.(*model.TopicPublishInfo) + } + } + if topicPublicInfo.HaveTopicRouterInfo && topicPublicInfo.JudgeTopicPublishInfoOk() { + return + } + //try to use the defaultTopic + err = self.UpdateTopicRouteInfoFromNameServerUseDefaultTopic(topic) + + defaultValue, defaultValueOk := self.TopicPublishInfoTable.Get(topic) + if defaultValueOk { + topicPublicInfo = defaultValue.(*model.TopicPublishInfo) + } + + return +} + +func (self MqClientImpl) GetTopicRouteInfoFromNameServer(topic string, timeoutMillis int64) (*model.TopicRouteData, error) { + requestHeader := &header.GetRouteInfoRequestHeader{ + Topic: topic, + } + var remotingCommand = remoting.NewRemotingCommand(remoting.GET_ROUTEINTO_BY_TOPIC, requestHeader) + response, err := self.remotingClient.InvokeSync("", remotingCommand, timeoutMillis) + + if err != nil { + return nil, err + } + if response.Code == remoting.SUCCESS { + //todo it's dirty + topicRouteData := new(model.TopicRouteData) + bodyjson := strings.Replace(string(response.Body), ",0:", ",\"0\":", -1) + bodyjson = strings.Replace(bodyjson, ",1:", ",\"1\":", -1) // fastJsonçkey没æå¼å· éè¦éç¨çæ¹æ³ + bodyjson = strings.Replace(bodyjson, "{0:", "{\"0\":", -1) + bodyjson = strings.Replace(bodyjson, "{1:", "{\"1\":", -1) + err = json.Unmarshal([]byte(bodyjson), topicRouteData) + if err != nil { + glog.Error(err, bodyjson) + return nil, err + } + return topicRouteData, nil + } else { + return nil, errors.New(fmt.Sprintf("get topicRouteInfo from nameServer error[code:%d,topic:%s]", response.Code, topic)) + } +} + +func (self MqClientImpl) FindBrokerAddressInSubscribe(brokerName string, brokerId int, onlyThisBroker bool) (brokerAddr string, slave bool, found bool) { + slave = false + found = false + value, ok := self.BrokerAddrTable.Get(brokerName) + if !ok { + return + } + brokerMap := value.(map[string]string) + //self.brokerAddrTableLock.RUnlock() + brokerAddr, ok = brokerMap[util.IntToString(brokerId)] + slave = (brokerId != 0) + found = ok + + if !found && !onlyThisBroker { + var id string + for id, brokerAddr = range brokerMap { + slave = (id != "0") + found = true + break + } + } + return +} + +func (self MqClientImpl) UpdateTopicRouteInfoFromNameServer(topic string) (err error) { + var ( + topicRouteData *model.TopicRouteData + ) + //namesvr lock + //topicRouteData = this.MqClientImplAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3); + topicRouteData, err = self.GetTopicRouteInfoFromNameServer(topic, 1000*3) + if err != nil { + return + } + self.updateTopicRouteInfoLocal(topic, topicRouteData) + return +} +func (self MqClientImpl) UpdateTopicRouteInfoFromNameServerUseDefaultTopic(topic string) (err error) { + var ( + topicRouteData *model.TopicRouteData + ) + //namesvr lock + //topicRouteData = this.MqClientImplAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3); + topicRouteData, err = self.GetTopicRouteInfoFromNameServer(constant.DEFAULT_TOPIC, 1000*3) + if err != nil { + return + } + + for _, queueData := range topicRouteData.QueueDatas { + defaultQueueData := constant.DEFAULT_TOPIC_QUEUE_NUMS + if queueData.ReadQueueNums < defaultQueueData { + defaultQueueData = queueData.ReadQueueNums + } + queueData.ReadQueueNums = defaultQueueData + queueData.WriteQueueNums = defaultQueueData + } + self.updateTopicRouteInfoLocal(topic, topicRouteData) + return +} +func (self MqClientImpl) updateTopicRouteInfoLocal(topic string, topicRouteData *model.TopicRouteData) (err error) { + if topicRouteData == nil { + return + } + // topicRouteData judgeTopicRouteData need update + needUpdate := true + if !needUpdate { + return + } + //update brokerAddrTable + for _, brokerData := range topicRouteData.BrokerDatas { + self.BrokerAddrTable.Set(brokerData.BrokerName, brokerData.BrokerAddrs) + } + + //update pubInfo for each + topicPublishInfo := model.BuildTopicPublishInfoFromTopicRoteData(topic, topicRouteData) + self.TopicPublishInfoTable.Set(topic, topicPublishInfo) + + mqList := model.BuildTopicSubscribeInfoFromRoteData(topic, topicRouteData) + self.TopicSubscribeInfoTable.Set(topic, mqList) + self.TopicRouteTable.Set(topic, topicRouteData) + return +} + +func (self MqClientImpl) FindBrokerAddrByTopic(topic string) (addr string, ok bool) { + value, findValue := self.TopicRouteTable.Get(topic) + if !findValue { + return "", false + } + topicRouteData := value.(*model.TopicRouteData) + brokers := topicRouteData.BrokerDatas + if brokers != nil && len(brokers) > 0 { + brokerData := brokers[0] + brokerData.BrokerAddrsLock.RLock() + addr, ok = brokerData.BrokerAddrs["0"] + brokerData.BrokerAddrsLock.RUnlock() + + if ok { + return + } + for _, addr = range brokerData.BrokerAddrs { + return addr, ok + } + } + return +} + +func buildMqClientImplId() (clientId string) { + clientId = util.GetLocalIp4() + "@" + strconv.Itoa(os.Getpid()) + return +} + +func (self MqClientImpl) sendHeartBeat(addr string, remotingCommand *remoting.RemotingCommand, timeoutMillis int64) error { + remotingCommand, err := self.remotingClient.InvokeSync(addr, remotingCommand, timeoutMillis) + if err != nil { + glog.Error(err) + } else { + if remotingCommand == nil || remotingCommand.Code != remoting.SUCCESS { + glog.Error("send heartbeat response error") + } + } + return err +} + +func (self MqClientImpl) SendHeartbeatToAllBroker(heartBeatData *model.HeartbeatData) (err error) { + //self.brokerAddrTableLock.RLock() + + for _, brokerTable := range self.BrokerAddrTable.Items() { + for brokerId, addr := range brokerTable.(map[string]string) { + if len(addr) == 0 || brokerId != "0" { + continue + } + data, err := json.Marshal(heartBeatData) + if err != nil { + glog.Error(err) + return err + } + glog.V(2).Info("send heartbeat to broker look data[", string(data)+"]") + remotingCommand := remoting.NewRemotingCommandWithBody(remoting.HEART_BEAT, nil, data) + glog.V(2).Info("send heartbeat to broker[", addr+"]") + self.sendHeartBeat(addr, remotingCommand, 3000) + + } + } + return nil } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/util/concurrent_map.go ---------------------------------------------------------------------- diff --git a/rocketmq-go/util/concurrent_map.go b/rocketmq-go/util/concurrent_map.go new file mode 100644 index 0000000..2fbe9bf --- /dev/null +++ b/rocketmq-go/util/concurrent_map.go @@ -0,0 +1,278 @@ +package util + +import ( + "encoding/json" + "sync" +) + +var SHARD_COUNT = 33 + +// A "thread" safe map of type string:Anything. +// To avoid lock bottlenecks this map is dived to several (SHARD_COUNT) map shards. +type ConcurrentMap []*concurrentMapShared + +// A "thread" safe string to anything map. +type concurrentMapShared struct { + items map[string]interface{} + sync.RWMutex // Read Write mutex, guards access to internal map. +} + +// Creates a new concurrent map. +func New() ConcurrentMap { + m := make(ConcurrentMap, SHARD_COUNT) + for i := 0; i < SHARD_COUNT; i++ { + m[i] = &concurrentMapShared{items: make(map[string]interface{})} + } + return m +} + +// Returns shard under given key +func (m ConcurrentMap) GetShard(key string) *concurrentMapShared { + return m[uint(fnv32(key))%uint(SHARD_COUNT)] +} + +func (m ConcurrentMap) MSet(data map[string]interface{}) { + for key, value := range data { + shard := m.GetShard(key) + shard.Lock() + shard.items[key] = value + shard.Unlock() + } +} + +// Sets the given value under the specified key. +func (m *ConcurrentMap) Set(key string, value interface{}) { + // Get map shard. + shard := m.GetShard(key) + shard.Lock() + shard.items[key] = value + shard.Unlock() +} + +// Callback to return new element to be inserted into the map +// It is called while lock is held, therefore it MUST NOT +// try to access other keys in same map, as it can lead to deadlock since +// Go sync.RWLock is not reentrant +type UpsertCb func(exist bool, valueInMap interface{}, newValue interface{}) interface{} + +// Insert or Update - updates existing element or inserts a new one using UpsertCb +func (m *ConcurrentMap) Upsert(key string, value interface{}, cb UpsertCb) (res interface{}) { + shard := m.GetShard(key) + shard.Lock() + v, ok := shard.items[key] + res = cb(ok, v, value) + shard.items[key] = res + shard.Unlock() + return res +} + +// Sets the given value under the specified key if no value was associated with it. +func (m *ConcurrentMap) SetIfAbsent(key string, value interface{}) bool { + // Get map shard. + shard := m.GetShard(key) + shard.Lock() + _, ok := shard.items[key] + if !ok { + shard.items[key] = value + } + shard.Unlock() + return !ok +} + +// Retrieves an element from map under given key. +func (m ConcurrentMap) Get(key string) (interface{}, bool) { + // Get shard + shard := m.GetShard(key) + shard.RLock() + // Get item from shard. + val, ok := shard.items[key] + shard.RUnlock() + return val, ok +} + +// Returns the number of elements within the map. +func (m ConcurrentMap) Count() int { + count := 0 + for i := 0; i < SHARD_COUNT; i++ { + shard := m[i] + shard.RLock() + count += len(shard.items) + shard.RUnlock() + } + return count +} + +// Looks up an item under specified key +func (m *ConcurrentMap) Has(key string) bool { + // Get shard + shard := m.GetShard(key) + shard.RLock() + // See if element is within shard. + _, ok := shard.items[key] + shard.RUnlock() + return ok +} + +// Removes an element from the map. +func (m *ConcurrentMap) Remove(key string) { + // Try to get shard. + shard := m.GetShard(key) + shard.Lock() + delete(shard.items, key) + shard.Unlock() +} + +// Removes an element from the map and returns it +func (m *ConcurrentMap) Pop(key string) (v interface{}, exists bool) { + // Try to get shard. + shard := m.GetShard(key) + shard.Lock() + v, exists = shard.items[key] + delete(shard.items, key) + shard.Unlock() + return v, exists +} + +// Checks if map is empty. +func (m *ConcurrentMap) IsEmpty() bool { + return m.Count() == 0 +} + +// Used by the Iter & IterBuffered functions to wrap two variables together over a channel, +type Tuple struct { + Key string + Val interface{} +} + +// Returns an iterator which could be used in a for range loop. +// +// Deprecated: using IterBuffered() will get a better performence +func (m ConcurrentMap) Iter() <-chan Tuple { + ch := make(chan Tuple) + go func() { + wg := sync.WaitGroup{} + wg.Add(SHARD_COUNT) + // Foreach shard. + for _, shard := range m { + go func(shard *concurrentMapShared) { + // Foreach key, value pair. + shard.RLock() + for key, val := range shard.items { + ch <- Tuple{key, val} + } + shard.RUnlock() + wg.Done() + }(shard) + } + wg.Wait() + close(ch) + }() + return ch +} + +// Returns a buffered iterator which could be used in a for range loop. +func (m ConcurrentMap) IterBuffered() <-chan Tuple { + ch := make(chan Tuple, m.Count()) + go func() { + wg := sync.WaitGroup{} + wg.Add(SHARD_COUNT) + // Foreach shard. + for _, shard := range m { + go func(shard *concurrentMapShared) { + // Foreach key, value pair. + shard.RLock() + for key, val := range shard.items { + ch <- Tuple{key, val} + } + shard.RUnlock() + wg.Done() + }(shard) + } + wg.Wait() + close(ch) + }() + return ch +} + +// Returns all items as map[string]interface{} +func (m ConcurrentMap) Items() map[string]interface{} { + tmp := make(map[string]interface{}) + + // Insert items to temporary map. + for item := range m.IterBuffered() { + tmp[item.Key] = item.Val + } + + return tmp +} + +// Iterator callback,called for every key,value found in +// maps. RLock is held for all calls for a given shard +// therefore callback sess consistent view of a shard, +// but not across the shards +type IterCb func(key string, v interface{}) + +// Callback based iterator, cheapest way to read +// all elements in a map. +func (m *ConcurrentMap) IterCb(fn IterCb) { + for idx := range *m { + shard := (*m)[idx] + shard.RLock() + for key, value := range shard.items { + fn(key, value) + } + shard.RUnlock() + } +} + +// Return all keys as []string +func (m ConcurrentMap) Keys() []string { + count := m.Count() + ch := make(chan string, count) + go func() { + // Foreach shard. + wg := sync.WaitGroup{} + wg.Add(SHARD_COUNT) + for _, shard := range m { + go func(shard *concurrentMapShared) { + // Foreach key, value pair. + shard.RLock() + for key := range shard.items { + ch <- key + } + shard.RUnlock() + wg.Done() + }(shard) + } + wg.Wait() + close(ch) + }() + + keys := make([]string, 0, count) + for k := range ch { + keys = append(keys, k) + } + return keys +} + +//Reviles ConcurrentMap "private" variables to json marshal. +func (m ConcurrentMap) MarshalJSON() ([]byte, error) { + // Create a temporary map, which will hold all item spread across shards. + tmp := make(map[string]interface{}) + + // Insert items to temporary map. + for item := range m.IterBuffered() { + tmp[item.Key] = item.Val + } + return json.Marshal(tmp) +} + +func fnv32(key string) uint32 { + hash := uint32(2166136261) + const prime32 = uint32(16777619) + for i := 0; i < len(key); i++ { + hash *= prime32 + hash ^= uint32(key[i]) + } + return hash +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/util/ip.go ---------------------------------------------------------------------- diff --git a/rocketmq-go/util/ip.go b/rocketmq-go/util/ip.go new file mode 100644 index 0000000..f87000d --- /dev/null +++ b/rocketmq-go/util/ip.go @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package util + +import ( + "net" + "strings" +) + +func GetIp4Bytes() (ret []byte) { + ip := getIp() + ret = ip[len(ip)-4:] + return +} + +func GetLocalIp4() (ip4 string) { + ip := getIp() + if ip.To4() != nil { + currIp := ip.String() + if !strings.Contains(currIp, ":") && currIp != "127.0.0.1" && isIntranetIpv4(currIp) { + ip4 = currIp + } + } + return +} +func getIp() (ip net.IP) { + interfaces, err := net.Interfaces() + if err != nil { + return + } + + for _, face := range interfaces { + if strings.Contains(face.Name, "lo") { + continue + } + addrs, err := face.Addrs() + if err != nil { + return + } + + for _, addr := range addrs { + if ipnet, ok := addr.(*net.IPNet); ok && !ipnet.IP.IsLoopback() { + if ipnet.IP.To4() != nil { + currIp := ipnet.IP.String() + if !strings.Contains(currIp, ":") && currIp != "127.0.0.1" && isIntranetIpv4(currIp) { + ip = ipnet.IP + } + } + } + } + } + return +} +func isIntranetIpv4(ip string) bool { + if strings.HasPrefix(ip, "192.168.") || strings.HasPrefix(ip, "169.254.") { + return true + } + return false +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/util/json_util.go ---------------------------------------------------------------------- diff --git a/rocketmq-go/util/json_util.go b/rocketmq-go/util/json_util.go new file mode 100644 index 0000000..91fdc5d --- /dev/null +++ b/rocketmq-go/util/json_util.go @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package util + +import ( + "errors" +) + +const ( + STRING = "STRING" + NUMBER = "NUMBER" + + START_OBJ = "START_OBJ" //{ + END_OBJ = "END_OBJ" //} + COMMA = "COMMA" //, + COLON = "COLON" //: + + //// may be next version impl it + //BOOL + //NULL + //START_ARRAY //[ + //END_ARRAY //] + //EOF +) + +type Token struct { + tokenType string + tokenValue string +} + +////{"offsetTable":{{"brokerName":"broker-b","queueId":122222,"topic":"GoLang"}:9420,{"brokerName":"broker-b","queueId":2,"topic":"GoLang"}:9184,{"brokerName":"broker-b","queueId":1,"topic":"GoLang"}:9260,{"brokerName":"broker-b","queueId":3,"topic":"GoLang"}:9139}} + +func GetKvStringMap(str string) (kvMap map[string]string, err error) { + var tokenList []Token + tokenList, err = parseTokenList(str) + kvMap = map[string]string{} + startObjCount := 0 + readType := 0 // 0 begin 1 key 2 value + nowKey := "" + nowValue := "" + for i := 0; i < len(tokenList); i++ { + nowToken := tokenList[i] + if nowToken.tokenType == START_OBJ { + startObjCount++ + } + if nowToken.tokenType == END_OBJ { + startObjCount-- + } + if readType == 0 { + if nowToken.tokenType != START_OBJ { + err = errors.New("json not start with {") + return + } + readType = 1 + } else if readType == 1 { + if nowToken.tokenType == COLON { //: split k and v + if startObjCount == 1 { + readType = 2 + continue + } + } + if nowToken.tokenType == STRING { + nowKey = nowKey + "\"" + nowToken.tokenValue + "\"" + } else { + nowKey = nowKey + nowToken.tokenValue + } + } else if readType == 2 { + if nowToken.tokenType == COMMA { // , split kv pair + if startObjCount == 1 { + kvMap[nowKey] = nowValue + nowKey = "" + nowValue = "" + readType = 1 + continue + } + } + if nowToken.tokenType == STRING { + nowValue = nowValue + "\"" + nowToken.tokenValue + "\"" + + } else { + if startObjCount > 0 { //use less end } + nowValue = nowValue + nowToken.tokenValue + } + } + + } else { + err = errors.New("this is a bug") + return + } + } + if len(nowKey) > 0 { + + kvMap[nowKey] = nowValue + } + return +} + +func parseTokenList(str string) (tokenList []Token, err error) { + + for i := 0; i < len(str); i++ { + c := str[i] + token := Token{} + switch c { + case '{': + token.tokenType = START_OBJ + token.tokenValue = string(c) + break + case '}': + token.tokenType = END_OBJ + token.tokenValue = string(c) + break + case ',': + token.tokenType = COMMA + token.tokenValue = string(c) + break + case ':': + token.tokenType = COLON + token.tokenValue = string(c) + break + case '"': + token.tokenType = STRING + token.tokenValue = "" + for i++; str[i] != '"'; i++ { + token.tokenValue = token.tokenValue + string(str[i]) + } + break + default: + if c == '-' || (str[i] <= '9' && str[i] >= '0') { + token.tokenType = NUMBER + token.tokenValue = string(c) + for i++; str[i] <= '9' && str[i] >= '0'; i++ { + token.tokenValue = token.tokenValue + string(str[i]) + } + i-- + break + } + err = errors.New("INVALID JSON") + return + } + tokenList = append(tokenList, token) + } + return +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/util/message_client_id_generator.go ---------------------------------------------------------------------- diff --git a/rocketmq-go/util/message_client_id_generator.go b/rocketmq-go/util/message_client_id_generator.go new file mode 100644 index 0000000..df4cfb6 --- /dev/null +++ b/rocketmq-go/util/message_client_id_generator.go @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package util + +import ( + "bytes" + "encoding/binary" + "os" + "strconv" + "strings" + "sync" + "time" +) + +var ( + counter int16 = 0 + startTime int64 //this month's first day 12 hour. for example. 2017-01-01 12:00:00 + nextStartTime int64 //next month's first day 12 hour. for example. 2017-02-01 12:00:00 + idPrefix string + lock sync.Mutex +) + +//MessageClientId = ip + pid + classloaderid + counter + time +//4 bytes for ip , +//2 bytes for pid, +//4 bytes for classloaderid(for java,go put 0) + +//2 bytes for counter, +//4 bytes for timediff, //(time.Now().UnixNano() - startTime) / 1000000) divide 1000000 because golang is different with java +func GeneratorMessageClientId() (uniqMessageId string) { + defer lock.Unlock() + lock.Lock() + if len(idPrefix) == 0 { + idPrefix = generatorMessageClientIdPrefix() + } + if time.Now().UnixNano() > nextStartTime { + startTime, nextStartTime = getStartAndNextStartTime() + } + counter = counter + 1 + var buf2 = bytes.NewBuffer([]byte{}) + binary.Write(buf2, binary.BigEndian, int32((time.Now().UnixNano()-startTime)/1000000)) + binary.Write(buf2, binary.BigEndian, counter) + uniqMessageId = idPrefix + bytes2string(buf2.Bytes()) + return +} + +func GeneratorMessageOffsetId(storeHost []byte, port int32, commitOffset int64) (messageOffsetId string) { + var buf = bytes.NewBuffer([]byte{}) + binary.Write(buf, binary.BigEndian, storeHost) + binary.Write(buf, binary.BigEndian, port) + binary.Write(buf, binary.BigEndian, commitOffset) + idPrefix := buf.Bytes() + messageOffsetId = bytes2string(idPrefix) + return +} +func generatorMessageClientIdPrefix() (messageClientIdPrefix string) { + var ( + idPrefix []byte + ip4Bytes []byte + pid int16 + classloaderId int32 = -1 // golang don't have this + ) + ip4Bytes = GetIp4Bytes() + pid = int16(os.Getpid()) + var buf = bytes.NewBuffer([]byte{}) + binary.Write(buf, binary.BigEndian, ip4Bytes) + binary.Write(buf, binary.BigEndian, pid) + binary.Write(buf, binary.BigEndian, classloaderId) + idPrefix = buf.Bytes() + messageClientIdPrefix = bytes2string(idPrefix) + return +} +func getStartAndNextStartTime() (thisMonthFirstDay12 int64, nextMonthFirstDay12 int64) { + now := time.Now() + year := now.Year() + month := now.Month() + thisMonthFirstDay12 = time.Date(year, month, 1, 12, 0, 0, 0, time.Local).UnixNano() + month = month + 1 + if month > 12 { + month = month - 12 + year = year + 1 + } + nextMonthFirstDay12 = time.Date(year, month, 1, 12, 0, 0, 0, time.Local).UnixNano() + return +} +func bytes2string(bytes []byte) (ret string) { + for _, oneByte := range bytes { + hexStr := strconv.FormatInt(int64(oneByte), 16) + if len(hexStr) < 2 { + hexStr = "0" + hexStr + } + ret = ret + hexStr + } + ret = strings.ToUpper(ret) + return +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/util/string_util.go ---------------------------------------------------------------------- diff --git a/rocketmq-go/util/string_util.go b/rocketmq-go/util/string_util.go new file mode 100644 index 0000000..7e31e00 --- /dev/null +++ b/rocketmq-go/util/string_util.go @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package util + +import "strconv" + +func StrToIntWithDefaultValue(str string, defaultValue int) (result int) { + ret, err := strconv.Atoi(str) + if err != nil { + result = defaultValue + } else { + result = ret + } + return +} +func ReadString(obj interface{}) (ret string) { + if obj == nil { + ret = "" + } else { + ret = obj.(string) + } + return +} + +func IntToString(intValue int) (ret string) { + ret = strconv.Itoa(intValue) + return +} + +func StrToInt(str string) (result int, err error) { + result, err = strconv.Atoi(str) + return +} +func StrToInt32(str string) (result int32, err error) { + var ret int64 + ret, err = strconv.ParseInt(str, 10, 32) + result = int32(ret) + return +} +func StrToInt16(str string) (result int16, err error) { + var ret int64 + ret, err = strconv.ParseInt(str, 10, 16) + result = int16(ret) + return +} +func StrToInt64(str string) (result int64, err error) { + result, err = strconv.ParseInt(str, 10, 64) + return +} + +func StrToInt32WithDefaultValue(str string, defaultValue int32) (result int32) { + ret, err := StrToInt32(str) + if err != nil { + result = defaultValue + } else { + result = ret + } + return +} +func StrToInt16WithDefaultValue(str string, defaultValue int16) (result int16) { + ret, err := StrToInt16(str) + if err != nil { + result = defaultValue + } else { + result = ret + } + return +} +func StrToInt64WithDefaultValue(str string, defaultValue int64) (result int64) { + ret, err := StrToInt64(str) + if err != nil { + result = defaultValue + } else { + result = ret + } + return +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/util/structs/field.go ---------------------------------------------------------------------- diff --git a/rocketmq-go/util/structs/field.go b/rocketmq-go/util/structs/field.go new file mode 100644 index 0000000..e697832 --- /dev/null +++ b/rocketmq-go/util/structs/field.go @@ -0,0 +1,141 @@ +package structs + +import ( + "errors" + "fmt" + "reflect" +) + +var ( + errNotExported = errors.New("field is not exported") + errNotSettable = errors.New("field is not settable") +) + +// Field represents a single struct field that encapsulates high level +// functions around the field. +type Field struct { + value reflect.Value + field reflect.StructField + defaultTag string +} + +// Tag returns the value associated with key in the tag string. If there is no +// such key in the tag, Tag returns the empty string. +func (f *Field) Tag(key string) string { + return f.field.Tag.Get(key) +} + +// Value returns the underlying value of the field. It panics if the field +// is not exported. +func (f *Field) Value() interface{} { + return f.value.Interface() +} + +// IsEmbedded returns true if the given field is an anonymous field (embedded) +func (f *Field) IsEmbedded() bool { + return f.field.Anonymous +} + +// IsExported returns true if the given field is exported. +func (f *Field) IsExported() bool { + return f.field.PkgPath == "" +} + +// IsZero returns true if the given field is not initialized (has a zero value). +// It panics if the field is not exported. +func (f *Field) IsZero() bool { + zero := reflect.Zero(f.value.Type()).Interface() + current := f.Value() + + return reflect.DeepEqual(current, zero) +} + +// Name returns the name of the given field +func (f *Field) Name() string { + return f.field.Name +} + +// Kind returns the fields kind, such as "string", "map", "bool", etc .. +func (f *Field) Kind() reflect.Kind { + return f.value.Kind() +} + +// Set sets the field to given value v. It returns an error if the field is not +// settable (not addressable or not exported) or if the given value's type +// doesn't match the fields type. +func (f *Field) Set(val interface{}) error { + // we can't set unexported fields, so be sure this field is exported + if !f.IsExported() { + return errNotExported + } + + // do we get here? not sure... + if !f.value.CanSet() { + return errNotSettable + } + + given := reflect.ValueOf(val) + + if f.value.Kind() != given.Kind() { + return fmt.Errorf("wrong kind. got: %s want: %s", given.Kind(), f.value.Kind()) + } + + f.value.Set(given) + return nil +} + +// Zero sets the field to its zero value. It returns an error if the field is not +// settable (not addressable or not exported). +func (f *Field) Zero() error { + zero := reflect.Zero(f.value.Type()).Interface() + return f.Set(zero) +} + +// Fields returns a slice of Fields. This is particular handy to get the fields +// of a nested struct . A struct tag with the content of "-" ignores the +// checking of that particular field. Example: +// +// // Field is ignored by this package. +// Field *http.Request `structs:"-"` +// +// It panics if field is not exported or if field's kind is not struct +func (f *Field) Fields() []*Field { + return getFields(f.value, f.defaultTag) +} + +// Field returns the field from a nested struct. It panics if the nested struct +// is not exported or if the field was not found. +func (f *Field) Field(name string) *Field { + field, ok := f.FieldOk(name) + if !ok { + panic("field not found") + } + + return field +} + +// FieldOk returns the field from a nested struct. The boolean returns whether +// the field was found (true) or not (false). +func (f *Field) FieldOk(name string) (*Field, bool) { + value := &f.value + // value must be settable so we need to make sure it holds the address of the + // variable and not a copy, so we can pass the pointer to strctVal instead of a + // copy (which is not assigned to any variable, hence not settable). + // see "https://blog.golang.org/laws-of-reflection#TOC_8." + if f.value.Kind() != reflect.Ptr { + a := f.value.Addr() + value = &a + } + v := strctVal(value.Interface()) + t := v.Type() + + field, ok := t.FieldByName(name) + if !ok { + return nil, false + } + + return &Field{ + field: field, + value: v.FieldByName(name), + }, true +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/util/structs/structs.go ---------------------------------------------------------------------- diff --git a/rocketmq-go/util/structs/structs.go b/rocketmq-go/util/structs/structs.go new file mode 100644 index 0000000..87b7970 --- /dev/null +++ b/rocketmq-go/util/structs/structs.go @@ -0,0 +1,581 @@ +// Package structs contains various utilities functions to work with structs. +package structs + +import ( + "fmt" + + "reflect" + "strings" +) + +var ( + // DefaultTagName is the default tag name for struct fields which provides + // a more granular to tweak certain structs. Lookup the necessary functions + // for more info. + DefaultTagName = "structs" // struct's field default tag name +) + +// Struct encapsulates a struct type to provide several high level functions +// around the struct. +type Struct struct { + raw interface{} + value reflect.Value + TagName string +} + +// New returns a new *Struct with the struct s. It panics if the s's kind is +// not struct. +func New(s interface{}) *Struct { + return &Struct{ + raw: s, + value: strctVal(s), + TagName: DefaultTagName, + } +} + +// Map converts the given struct to a map[string]interface{}, where the keys +// of the map are the field names and the values of the map the associated +// values of the fields. The default key string is the struct field name but +// can be changed in the struct field's tag value. The "structs" key in the +// struct's field tag value is the key name. Example: +// +// // Field appears in map as key "myName". +// Name string `structs:"myName"` +// +// A tag value with the content of "-" ignores that particular field. Example: +// +// // Field is ignored by this package. +// Field bool `structs:"-"` +// +// A tag value with the content of "string" uses the stringer to get the value. Example: +// +// // The value will be output of Animal's String() func. +// // Map will panic if Animal does not implement String(). +// Field *Animal `structs:"field,string"` +// +// A tag value with the option of "flatten" used in a struct field is to flatten its fields +// in the output map. Example: +// +// // The FieldStruct's fields will be flattened into the output map. +// FieldStruct time.Time `structs:",flatten"` +// +// A tag value with the option of "omitnested" stops iterating further if the type +// is a struct. Example: +// +// // Field is not processed further by this package. +// Field time.Time `structs:"myName,omitnested"` +// Field *http.Request `structs:",omitnested"` +// +// A tag value with the option of "omitempty" ignores that particular field if +// the field value is empty. Example: +// +// // Field appears in map as key "myName", but the field is +// // skipped if empty. +// Field string `structs:"myName,omitempty"` +// +// // Field appears in map as key "Field" (the default), but +// // the field is skipped if empty. +// Field string `structs:",omitempty"` +// +// Note that only exported fields of a struct can be accessed, non exported +// fields will be neglected. +func (s *Struct) Map() map[string]interface{} { + out := make(map[string]interface{}) + s.FillMap(out) + return out +} + +// FillMap is the same as Map. Instead of returning the output, it fills the +// given map. +func (s *Struct) FillMap(out map[string]interface{}) { + if out == nil { + return + } + + fields := s.structFields() + + for _, field := range fields { + name := field.Name + smallName := strings.Replace(name, string(name[0]), string(strings.ToLower(string(name[0]))), 1) //todo + val := s.value.FieldByName(name) + isSubStruct := false + var finalVal interface{} + + tagName, tagOpts := parseTag(field.Tag.Get(s.TagName)) + if tagName != "" { + name = tagName + } + + // if the value is a zero value and the field is marked as omitempty do + // not include + if tagOpts.Has("omitempty") { + zero := reflect.Zero(val.Type()).Interface() + current := val.Interface() + + if reflect.DeepEqual(current, zero) { + continue + } + } + + if !tagOpts.Has("omitnested") { + finalVal = s.nested(val) + + v := reflect.ValueOf(val.Interface()) + if v.Kind() == reflect.Ptr { + v = v.Elem() + } + + switch v.Kind() { + case reflect.Map, reflect.Struct: + isSubStruct = true + } + } else { + finalVal = val.Interface() + } + + if tagOpts.Has("string") { + s, ok := val.Interface().(fmt.Stringer) + if ok { + out[smallName] = s.String() + } + continue + } + + if isSubStruct && (tagOpts.Has("flatten")) { + for k := range finalVal.(map[string]interface{}) { + out[k] = finalVal.(map[string]interface{})[k] + } + } else { + out[smallName] = finalVal + } + } +} + +// Values converts the given s struct's field values to a []interface{}. A +// struct tag with the content of "-" ignores the that particular field. +// Example: +// +// // Field is ignored by this package. +// Field int `structs:"-"` +// +// A value with the option of "omitnested" stops iterating further if the type +// is a struct. Example: +// +// // Fields is not processed further by this package. +// Field time.Time `structs:",omitnested"` +// Field *http.Request `structs:",omitnested"` +// +// A tag value with the option of "omitempty" ignores that particular field and +// is not added to the values if the field value is empty. Example: +// +// // Field is skipped if empty +// Field string `structs:",omitempty"` +// +// Note that only exported fields of a struct can be accessed, non exported +// fields will be neglected. +func (s *Struct) Values() []interface{} { + fields := s.structFields() + + var t []interface{} + + for _, field := range fields { + val := s.value.FieldByName(field.Name) + + _, tagOpts := parseTag(field.Tag.Get(s.TagName)) + + // if the value is a zero value and the field is marked as omitempty do + // not include + if tagOpts.Has("omitempty") { + zero := reflect.Zero(val.Type()).Interface() + current := val.Interface() + + if reflect.DeepEqual(current, zero) { + continue + } + } + + if tagOpts.Has("string") { + s, ok := val.Interface().(fmt.Stringer) + if ok { + t = append(t, s.String()) + } + continue + } + + if IsStruct(val.Interface()) && !tagOpts.Has("omitnested") { + // look out for embedded structs, and convert them to a + // []interface{} to be added to the final values slice + for _, embeddedVal := range Values(val.Interface()) { + t = append(t, embeddedVal) + } + } else { + t = append(t, val.Interface()) + } + } + + return t +} + +// Fields returns a slice of Fields. A struct tag with the content of "-" +// ignores the checking of that particular field. Example: +// +// // Field is ignored by this package. +// Field bool `structs:"-"` +// +// It panics if s's kind is not struct. +func (s *Struct) Fields() []*Field { + return getFields(s.value, s.TagName) +} + +// Names returns a slice of field names. A struct tag with the content of "-" +// ignores the checking of that particular field. Example: +// +// // Field is ignored by this package. +// Field bool `structs:"-"` +// +// It panics if s's kind is not struct. +func (s *Struct) Names() []string { + fields := getFields(s.value, s.TagName) + + names := make([]string, len(fields)) + + for i, field := range fields { + names[i] = field.Name() + } + + return names +} + +func getFields(v reflect.Value, tagName string) []*Field { + if v.Kind() == reflect.Ptr { + v = v.Elem() + } + + t := v.Type() + + var fields []*Field + + for i := 0; i < t.NumField(); i++ { + field := t.Field(i) + + if tag := field.Tag.Get(tagName); tag == "-" { + continue + } + + f := &Field{ + field: field, + value: v.FieldByName(field.Name), + } + + fields = append(fields, f) + + } + + return fields +} + +// Field returns a new Field struct that provides several high level functions +// around a single struct field entity. It panics if the field is not found. +func (s *Struct) Field(name string) *Field { + f, ok := s.FieldOk(name) + if !ok { + panic("field not found") + } + + return f +} + +// FieldOk returns a new Field struct that provides several high level functions +// around a single struct field entity. The boolean returns true if the field +// was found. +func (s *Struct) FieldOk(name string) (*Field, bool) { + t := s.value.Type() + + field, ok := t.FieldByName(name) + if !ok { + return nil, false + } + + return &Field{ + field: field, + value: s.value.FieldByName(name), + defaultTag: s.TagName, + }, true +} + +// IsZero returns true if all fields in a struct is a zero value (not +// initialized) A struct tag with the content of "-" ignores the checking of +// that particular field. Example: +// +// // Field is ignored by this package. +// Field bool `structs:"-"` +// +// A value with the option of "omitnested" stops iterating further if the type +// is a struct. Example: +// +// // Field is not processed further by this package. +// Field time.Time `structs:"myName,omitnested"` +// Field *http.Request `structs:",omitnested"` +// +// Note that only exported fields of a struct can be accessed, non exported +// fields will be neglected. It panics if s's kind is not struct. +func (s *Struct) IsZero() bool { + fields := s.structFields() + + for _, field := range fields { + val := s.value.FieldByName(field.Name) + + _, tagOpts := parseTag(field.Tag.Get(s.TagName)) + + if IsStruct(val.Interface()) && !tagOpts.Has("omitnested") { + ok := IsZero(val.Interface()) + if !ok { + return false + } + + continue + } + + // zero value of the given field, such as "" for string, 0 for int + zero := reflect.Zero(val.Type()).Interface() + + // current value of the given field + current := val.Interface() + + if !reflect.DeepEqual(current, zero) { + return false + } + } + + return true +} + +// HasZero returns true if a field in a struct is not initialized (zero value). +// A struct tag with the content of "-" ignores the checking of that particular +// field. Example: +// +// // Field is ignored by this package. +// Field bool `structs:"-"` +// +// A value with the option of "omitnested" stops iterating further if the type +// is a struct. Example: +// +// // Field is not processed further by this package. +// Field time.Time `structs:"myName,omitnested"` +// Field *http.Request `structs:",omitnested"` +// +// Note that only exported fields of a struct can be accessed, non exported +// fields will be neglected. It panics if s's kind is not struct. +func (s *Struct) HasZero() bool { + fields := s.structFields() + + for _, field := range fields { + val := s.value.FieldByName(field.Name) + + _, tagOpts := parseTag(field.Tag.Get(s.TagName)) + + if IsStruct(val.Interface()) && !tagOpts.Has("omitnested") { + ok := HasZero(val.Interface()) + if ok { + return true + } + + continue + } + + // zero value of the given field, such as "" for string, 0 for int + zero := reflect.Zero(val.Type()).Interface() + + // current value of the given field + current := val.Interface() + + if reflect.DeepEqual(current, zero) { + return true + } + } + + return false +} + +// Name returns the structs's type name within its package. For more info refer +// to Name() function. +func (s *Struct) Name() string { + return s.value.Type().Name() +} + +// structFields returns the exported struct fields for a given s struct. This +// is a convenient helper method to avoid duplicate code in some of the +// functions. +func (s *Struct) structFields() []reflect.StructField { + t := s.value.Type() + + var f []reflect.StructField + + for i := 0; i < t.NumField(); i++ { + field := t.Field(i) + // we can't access the value of unexported fields + if field.PkgPath != "" { + continue + } + + // don't check if it's omitted + if tag := field.Tag.Get(s.TagName); tag == "-" { + continue + } + + f = append(f, field) + } + + return f +} + +func strctVal(s interface{}) reflect.Value { + v := reflect.ValueOf(s) + + // if pointer get the underlying element⤠+ for v.Kind() == reflect.Ptr { + v = v.Elem() + } + + if v.Kind() != reflect.Struct { + panic("not struct") + } + + return v +} + +// Map converts the given struct to a map[string]interface{}. For more info +// refer to Struct types Map() method. It panics if s's kind is not struct. +func Map(s interface{}) map[string]interface{} { + return New(s).Map() +} + +// FillMap is the same as Map. Instead of returning the output, it fills the +// given map. +func FillMap(s interface{}, out map[string]interface{}) { + New(s).FillMap(out) +} + +// Values converts the given struct to a []interface{}. For more info refer to +// Struct types Values() method. It panics if s's kind is not struct. +func Values(s interface{}) []interface{} { + return New(s).Values() +} + +// Fields returns a slice of *Field. For more info refer to Struct types +// Fields() method. It panics if s's kind is not struct. +func Fields(s interface{}) []*Field { + return New(s).Fields() +} + +// Names returns a slice of field names. For more info refer to Struct types +// Names() method. It panics if s's kind is not struct. +func Names(s interface{}) []string { + return New(s).Names() +} + +// IsZero returns true if all fields is equal to a zero value. For more info +// refer to Struct types IsZero() method. It panics if s's kind is not struct. +func IsZero(s interface{}) bool { + return New(s).IsZero() +} + +// HasZero returns true if any field is equal to a zero value. For more info +// refer to Struct types HasZero() method. It panics if s's kind is not struct. +func HasZero(s interface{}) bool { + return New(s).HasZero() +} + +// IsStruct returns true if the given variable is a struct or a pointer to +// struct. +func IsStruct(s interface{}) bool { + v := reflect.ValueOf(s) + if v.Kind() == reflect.Ptr { + v = v.Elem() + } + + // uninitialized zero value of a struct + if v.Kind() == reflect.Invalid { + return false + } + + return v.Kind() == reflect.Struct +} + +// Name returns the structs's type name within its package. It returns an +// empty string for unnamed types. It panics if s's kind is not struct. +func Name(s interface{}) string { + return New(s).Name() +} + +// nested retrieves recursively all types for the given value and returns the +// nested value. +func (s *Struct) nested(val reflect.Value) interface{} { + var finalVal interface{} + + v := reflect.ValueOf(val.Interface()) + if v.Kind() == reflect.Ptr { + v = v.Elem() + } + + switch v.Kind() { + case reflect.Struct: + n := New(val.Interface()) + n.TagName = s.TagName + m := n.Map() + + // do not add the converted value if there are no exported fields, ie: + // time.Time + if len(m) == 0 { + finalVal = val.Interface() + } else { + finalVal = m + } + case reflect.Map: + v := val.Type().Elem() + if v.Kind() == reflect.Ptr { + v = v.Elem() + } + + // only iterate over struct types, ie: map[string]StructType, + // map[string][]StructType, + if v.Kind() == reflect.Struct || + (v.Kind() == reflect.Slice && v.Elem().Kind() == reflect.Struct) { + m := make(map[string]interface{}, val.Len()) + for _, k := range val.MapKeys() { + m[k.String()] = s.nested(val.MapIndex(k)) + } + finalVal = m + break + } + + // TODO(arslan): should this be optional? + finalVal = val.Interface() + case reflect.Slice, reflect.Array: + if val.Type().Kind() == reflect.Interface { + finalVal = val.Interface() + break + } + + // TODO(arslan): should this be optional? + // do not iterate of non struct types, just pass the value. Ie: []int, + // []string, co... We only iterate further if it's a struct. + // i.e []foo or []*foo + if val.Type().Elem().Kind() != reflect.Struct && + !(val.Type().Elem().Kind() == reflect.Ptr && + val.Type().Elem().Elem().Kind() == reflect.Struct) { + finalVal = val.Interface() + break + } + + slices := make([]interface{}, val.Len(), val.Len()) + for x := 0; x < val.Len(); x++ { + slices[x] = s.nested(val.Index(x)) + } + finalVal = slices + default: + finalVal = val.Interface() + } + + return finalVal +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/aaa0758e/rocketmq-go/util/structs/tags.go ---------------------------------------------------------------------- diff --git a/rocketmq-go/util/structs/tags.go b/rocketmq-go/util/structs/tags.go new file mode 100644 index 0000000..8859341 --- /dev/null +++ b/rocketmq-go/util/structs/tags.go @@ -0,0 +1,32 @@ +package structs + +import "strings" + +// tagOptions contains a slice of tag options +type tagOptions []string + +// Has returns true if the given optiton is available in tagOptions +func (t tagOptions) Has(opt string) bool { + for _, tagOpt := range t { + if tagOpt == opt { + return true + } + } + + return false +} + +// parseTag splits a struct field's tag into its name and a list of options +// which comes after a name. A tag is in the form of: "name,option1,option2". +// The name can be neglectected. +func parseTag(tag string) (string, tagOptions) { + // tag is one of followings: + // "" + // "name" + // "name,opt" + // "name,opt,opt2" + // ",opt" + + res := strings.Split(tag, ",") + return res[0], res[1:] +}
