This is an automated email from the ASF dual-hosted git repository.

huzongtang pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/native by this push:
     new 0aa7eaf  [ISSUE #75] Support Statistics of client (#113)
0aa7eaf is described below

commit 0aa7eaf64d09195f7da2c314dee6512aad3c9bea
Author: 高峰 <[email protected]>
AuthorDate: Mon Jul 15 09:04:47 2019 +0800

    [ISSUE #75] Support Statistics of client (#113)
    
    * complete client statistic feature
    
    * fix statistic caller
    
    * fix typo
---
 consumer/push_consumer.go |   6 +-
 consumer/statistics.go    | 393 ++++++++++++++++++++++++++++++++++++++++++++--
 2 files changed, 382 insertions(+), 17 deletions(-)

diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index 9000651..477c3e6 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -455,8 +455,8 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) {
                        prevRequestOffset := request.nextOffset
                        request.nextOffset = result.NextBeginOffset
 
-                       rt := time.Now().Sub(beginTime)
-                       increasePullRT(pc.consumerGroup, request.mq.Topic, rt)
+                       rt := time.Now().Sub(beginTime) / time.Millisecond
+                       increasePullRT(pc.consumerGroup, request.mq.Topic, 
int64(rt))
 
                        
result.SetMessageExts(primitive.DecodeMessage(result.GetBody()))
 
@@ -658,7 +658,7 @@ func (pc *pushConsumer) consumeMessageCurrently(pq 
*processQueue, mq *primitive.
                        }
 
                        // TODO hook
-                       increaseConsumeRT(pc.consumerGroup, mq.Topic, consumeRT)
+                       increaseConsumeRT(pc.consumerGroup, mq.Topic, 
int64(consumeRT/time.Millisecond))
 
                        if !pq.dropped {
                                msgBackFailed := make([]*primitive.MessageExt, 
0)
diff --git a/consumer/statistics.go b/consumer/statistics.go
index 29045a0..adc7b38 100644
--- a/consumer/statistics.go
+++ b/consumer/statistics.go
@@ -17,44 +17,409 @@ limitations under the License.
 
 package consumer
 
-import "time"
+import (
+       "container/list"
+       "github.com/apache/rocketmq-client-go/rlog"
+       "sync"
+       "sync/atomic"
+       "time"
+)
 
 var (
-       topicAndGroupConsumeOKTPS     = &statsItemSet{statsName: 
"CONSUME_OK_TPS"}
-       topicAndGroupConsumeRT        = &statsItemSet{statsName: 
"CONSUME_FAILED_TPS"}
-       topicAndGroupConsumeFailedTPS = &statsItemSet{statsName: "CONSUME_RT"}
-       topicAndGroupPullTPS          = &statsItemSet{statsName: "PULL_TPS"}
-       topicAndGroupPullRT           = &statsItemSet{statsName: "PULL_RT"}
+       csListLock sync.Mutex
+
+       topicAndGroupConsumeOKTPS     *statsItemSet
+       topicAndGroupConsumeRT        *statsItemSet
+       topicAndGroupConsumeFailedTPS *statsItemSet
+       topicAndGroupPullTPS          *statsItemSet
+       topicAndGroupPullRT           *statsItemSet
 )
 
-type statsItem struct {
+func init() {
+       topicAndGroupConsumeOKTPS = newStatsItemSet("CONSUME_OK_TPS")
+       topicAndGroupConsumeRT = newStatsItemSet("CONSUME_FAILED_TPS")
+       topicAndGroupConsumeFailedTPS = newStatsItemSet("CONSUME_RT")
+       topicAndGroupPullTPS = newStatsItemSet("PULL_TPS")
+       topicAndGroupPullRT = newStatsItemSet("PULL_RT")
+}
+
+type ConsumeStatus struct {
+       PullRT            float64
+       PullTPS           float64
+       ConsumeRT         float64
+       ConsumeOKTPS      float64
+       ConsumeFailedTPS  float64
+       ConsumeFailedMsgs int64
+}
+
+func increasePullRT(group, topic string, rt int64) {
+       topicAndGroupPullRT.addValue(topic+"@"+group, rt, 1)
+}
+
+func increasePullTPS(group, topic string, msgs int) {
+       topicAndGroupPullTPS.addValue(topic+"@"+group, int64(msgs), 1)
+}
+
+func increaseConsumeRT(group, topic string, rt int64) {
+       topicAndGroupConsumeRT.addValue(topic+"@"+group, rt, 1)
+}
+
+func increaseConsumeOKTPS(group, topic string, msgs int) {
+       topicAndGroupConsumeOKTPS.addValue(topic+"@"+group, int64(msgs), 1)
+}
+
+func increaseConsumeFailedTPS(group, topic string, msgs int) {
+       topicAndGroupConsumeFailedTPS.addValue(topic+"@"+group, int64(msgs), 1)
+}
+
+func GetConsumeStatus(group, topic string) ConsumeStatus {
+       cs := ConsumeStatus{}
+       ss := getPullRT(group, topic)
+       cs.PullTPS = ss.tps
+
+       ss = getPullTPS(group, topic)
+       cs.PullTPS = ss.tps
+
+       ss = getConsumeRT(group, topic)
+       cs.ConsumeRT = ss.avgpt
+
+       ss = getConsumeOKTPS(group, topic)
+       cs.ConsumeOKTPS = ss.tps
+
+       ss = getConsumeFailedTPS(group, topic)
+
+       cs.ConsumeFailedTPS = ss.tps
+
+       ss = topicAndGroupConsumeFailedTPS.getStatsDataInHour(topic + "@" + 
group)
+       cs.ConsumeFailedMsgs = ss.sum
+       return cs
+}
+
+func ShutDownStatis() {
+       topicAndGroupConsumeOKTPS.closed = true
+       topicAndGroupConsumeRT.closed = true
+       topicAndGroupConsumeFailedTPS.closed = true
+       topicAndGroupPullTPS.closed = true
+       topicAndGroupPullRT.closed = true
+}
+
+func getPullRT(group, topic string) statsSnapshot {
+       return topicAndGroupPullRT.getStatsDataInMinute(topic + "@" + group)
+}
+
+func getPullTPS(group, topic string) statsSnapshot {
+       return topicAndGroupPullTPS.getStatsDataInMinute(topic + "@" + group)
+}
+
+func getConsumeRT(group, topic string) statsSnapshot {
+       ss := topicAndGroupPullRT.getStatsDataInMinute(topic + "@" + group)
+       if ss.sum == 0 {
+               return topicAndGroupConsumeRT.getStatsDataInHour(topic + "@" + 
group)
+       }
+       return ss
+}
+
+func getConsumeOKTPS(group, topic string) statsSnapshot {
+       return topicAndGroupConsumeOKTPS.getStatsDataInMinute(topic + "@" + 
group)
+}
+
+func getConsumeFailedTPS(group, topic string) statsSnapshot {
+       return topicAndGroupConsumeFailedTPS.getStatsDataInMinute(topic + "@" + 
group)
 }
 
 type statsItemSet struct {
        statsName      string
-       statsItemTable map[string]statsItem
+       statsItemTable sync.Map
+       closed         bool
+}
+
+func newStatsItemSet(statsName string) *statsItemSet {
+       sis := &statsItemSet{
+               statsName: statsName,
+       }
+       sis.init()
+       return sis
+}
+
+func (sis *statsItemSet) init() {
+       go func() {
+               for !sis.closed {
+                       sis.samplingInSeconds()
+                       time.Sleep(10 * time.Second)
+               }
+       }()
+
+       go func() {
+               for !sis.closed {
+                       sis.samplingInMinutes()
+                       time.Sleep(10 * time.Minute)
+               }
+       }()
+
+       go func() {
+               for !sis.closed {
+                       sis.samplingInHour()
+                       time.Sleep(time.Hour)
+               }
+       }()
+
+       go func() {
+               time.Sleep(nextMinutesTime().Sub(time.Now()))
+               for !sis.closed {
+                       sis.printAtMinutes()
+                       time.Sleep(time.Minute)
+               }
+       }()
+
+       go func() {
+               time.Sleep(nextHourTime().Sub(time.Now()))
+               for !sis.closed {
+                       sis.printAtHour()
+                       time.Sleep(time.Hour)
+               }
+       }()
+
+       go func() {
+               time.Sleep(nextMonthTime().Sub(time.Now()))
+               for !sis.closed {
+                       sis.printAtDay()
+                       time.Sleep(24 * time.Hour)
+               }
+       }()
+}
+
+
+func (sis *statsItemSet) samplingInSeconds() {
+       sis.statsItemTable.Range(func(key, value interface{}) bool {
+               si := value.(*statsItem)
+               si.samplingInSeconds()
+               return true
+       })
+}
+
+func (sis *statsItemSet) samplingInMinutes() {
+       sis.statsItemTable.Range(func(key, value interface{}) bool {
+               si := value.(*statsItem)
+               si.samplingInMinutes()
+               return true
+       })
+}
+
+func (sis *statsItemSet) samplingInHour() {
+       sis.statsItemTable.Range(func(key, value interface{}) bool {
+               si := value.(*statsItem)
+               si.samplingInHour()
+               return true
+       })
+}
+
+func (sis *statsItemSet) printAtMinutes() {
+       sis.statsItemTable.Range(func(key, value interface{}) bool {
+               si := value.(*statsItem)
+               si.printAtMinutes()
+               return true
+       })
+}
+
+func (sis *statsItemSet) printAtHour() {
+       sis.statsItemTable.Range(func(key, value interface{}) bool {
+               si := value.(*statsItem)
+               si.printAtHour()
+               return true
+       })
 }
 
-func (set *statsItemSet) addValue(key string, incValue, incTimes int) {
+func (sis *statsItemSet) printAtDay() {
+       sis.statsItemTable.Range(func(key, value interface{}) bool {
+               si := value.(*statsItem)
+               si.printAtDay()
+               return true
+       })
+}
 
+func (sis *statsItemSet) addValue(key string, incValue, incTimes int64) {
+       si := sis.getAndCreateStateItem(key)
+       atomic.AddInt64(&si.value, incValue)
+       atomic.AddInt64(&si.times, incTimes)
 }
 
-func increasePullRT(group, topic string, rt time.Duration) {
+func (sis *statsItemSet) getAndCreateStateItem(key string) *statsItem {
+       if val, ok := sis.statsItemTable.Load(key); ok {
+               return val.(*statsItem)
+       } else {
+               si := newStatsItem(sis.statsName, key)
+               sis.statsItemTable.Store(key, si)
+               return si
+       }
+}
 
+func (sis *statsItemSet) getStatsDataInMinute(key string) statsSnapshot {
+       if val, ok := sis.statsItemTable.Load(key); ok {
+               si := val.(*statsItem)
+               return si.getStatsDataInMinute()
+       }
+       return statsSnapshot{}
 }
 
-func increaseConsumeRT(group, topic string, rt time.Duration) {
+func (sis *statsItemSet) getStatsDataInHour(key string) statsSnapshot {
+       if val, ok := sis.statsItemTable.Load(key); ok {
+               si := val.(*statsItem)
+               return si.getStatsDataInHour()
+       }
+       return statsSnapshot{}
+}
 
+func (sis *statsItemSet) getStatsDataInDay(key string) statsSnapshot {
+       if val, ok := sis.statsItemTable.Load(key); ok {
+               si := val.(*statsItem)
+               return si.getStatsDataInDay()
+       }
+       return statsSnapshot{}
 }
 
-func increasePullTPS(group, topic string, msgNumber int) {
+func (sis *statsItemSet) getStatsItem(key string) *statsItem {
+       val, _ := sis.statsItemTable.Load(key)
+       return val.(*statsItem)
+}
 
+type statsItem struct {
+       value            int64
+       times            int64
+       csListMinute     *list.List
+       csListHour       *list.List
+       csListDay        *list.List
+       statsName        string
+       statsKey         string
+       csListMinuteLock sync.Mutex
+       csListHourLock   sync.Mutex
+       csListDayLock    sync.Mutex
 }
 
-func increaseConsumeOKTPS(group, topic string, msgNumber int) {
+func (si *statsItem) getStatsDataInMinute() statsSnapshot {
+       return computeStatsData(si.csListMinute)
+}
+
+func (si *statsItem) getStatsDataInHour() statsSnapshot {
+       return computeStatsData(si.csListHour)
+}
+
+func (si *statsItem) getStatsDataInDay() statsSnapshot {
+       return computeStatsData(si.csListDay)
+}
 
+func newStatsItem(statsName, statsKey string) *statsItem {
+       return &statsItem{
+               statsName:    statsName,
+               statsKey:     statsKey,
+               csListMinute: list.New(),
+               csListHour:   list.New(),
+               csListDay:    list.New(),
+       }
 }
 
-func increaseConsumeFailedTPS(group, topic string, msgNumber int) {
+func (si *statsItem) samplingInSeconds() {
+       si.csListMinuteLock.Lock()
+       defer si.csListMinuteLock.Unlock()
+       si.csListMinute.PushBack(callSnapshot{
+               timestamp: time.Now().Unix() * 1000,
+               time:      atomic.LoadInt64(&si.times),
+               value:     atomic.LoadInt64(&si.value),
+       })
+       if si.csListMinute.Len() > 7 {
+               si.csListMinute.Remove(si.csListMinute.Front())
+       }
+}
+
+func (si *statsItem) samplingInMinutes() {
+       si.csListHourLock.Lock()
+       defer si.csListHourLock.Unlock()
+       si.csListHour.PushBack(callSnapshot{
+               timestamp: time.Now().Unix() * 1000,
+               time:      atomic.LoadInt64(&si.times),
+               value:     atomic.LoadInt64(&si.value),
+       })
+       if si.csListHour.Len() > 7 {
+               si.csListHour.Remove(si.csListHour.Front())
+       }
+}
+
+func (si *statsItem) samplingInHour() {
+       si.csListDayLock.Lock()
+       defer si.csListDayLock.Unlock()
+       si.csListDay.PushBack(callSnapshot{
+               timestamp: time.Now().Unix() * 1000,
+               time:      atomic.LoadInt64(&si.times),
+               value:     atomic.LoadInt64(&si.value),
+       })
+       if si.csListDay.Len() > 25 {
+               si.csListHour.Remove(si.csListDay.Front())
+       }
+}
+
+func (si *statsItem) printAtMinutes() {
+       ss := computeStatsData(si.csListMinute)
+       rlog.Infof("[%s] [%s] Stats In One Minute, SUM: %d TPS: %.2f AVGPT: 
%.2f",
+               si.statsName, si.statsKey, ss.sum, ss.tps, ss.avgpt)
+}
+
+func (si *statsItem) printAtHour() {
+       ss := computeStatsData(si.csListHour)
+       rlog.Infof("[%s] [%s] Stats In One Hour, SUM: %d TPS: %.2f AVGPT: %.2f",
+               si.statsName, si.statsKey, ss.sum, ss.tps, ss.avgpt)
+}
+
+func (si *statsItem) printAtDay() {
+       ss := computeStatsData(si.csListDay)
+       rlog.Infof("[%s] [%s] Stats In One Day, SUM: %d TPS: %.2f AVGPT: %.2f",
+               si.statsName, si.statsKey, ss.sum, ss.tps, ss.avgpt)
+}
+
+func nextMinutesTime() time.Time {
+       now := time.Now()
+       m, _ := time.ParseDuration("1m")
+       return now.Add(m)
+}
+
+func nextHourTime() time.Time {
+       now := time.Now()
+       m, _ := time.ParseDuration("1h")
+       return now.Add(m)
+}
+
+func nextMonthTime() time.Time {
+       now := time.Now()
+       return now.AddDate(0, 1, 0)
+}
+
+func computeStatsData(csList *list.List) statsSnapshot {
+       csListLock.Lock()
+       defer csListLock.Unlock()
+       tps, avgpt, sum := 0.0, 0.0, int64(0)
+       if csList.Len() > 0 {
+               first := csList.Front().Value.(callSnapshot)
+               last := csList.Back().Value.(callSnapshot)
+               sum = last.value - first.value
+               tps = float64(sum*1000.0) / 
float64(last.timestamp-first.timestamp)
+               timesDiff := last.time - first.time
+               if timesDiff > 0 {
+                       avgpt = float64(sum*1.0) / float64(timesDiff)
+               }
+       }
+       return statsSnapshot{
+               tps:   tps,
+               avgpt: avgpt,
+               sum:   sum,
+       }
+}
+
+type callSnapshot struct {
+       timestamp int64
+       time      int64
+       value     int64
+}
 
+type statsSnapshot struct {
+       sum   int64
+       tps   float64
+       avgpt float64
 }

Reply via email to