This is an automated email from the ASF dual-hosted git repository.

wenfeng pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/native by this push:
     new 2ace48a  [ISSUE #356] feat(consumer): redesign stat (#357)
2ace48a is described below

commit 2ace48afd7c054fa6231e28407f7e40f76c84ad6
Author: xujianhai666 <[email protected]>
AuthorDate: Mon Jan 6 10:09:43 2020 +0800

    [ISSUE #356] feat(consumer): redesign stat (#357)
    
    * feat(consumer): redesign stat
---
 consumer/statistics.go | 89 ++++++++++++++++++++++++++++++++++++--------------
 1 file changed, 65 insertions(+), 24 deletions(-)

diff --git a/consumer/statistics.go b/consumer/statistics.go
index 2448c74..a58ff9e 100644
--- a/consumer/statistics.go
+++ b/consumer/statistics.go
@@ -29,6 +29,7 @@ import (
 
 var (
        csListLock sync.Mutex
+       closeOnce  sync.Once
 
        topicAndGroupConsumeOKTPS     *statsItemSet
        topicAndGroupConsumeRT        *statsItemSet
@@ -98,11 +99,13 @@ func GetConsumeStatus(group, topic string) ConsumeStatus {
 }
 
 func ShutDownStatis() {
-       topicAndGroupConsumeOKTPS.closed = true
-       topicAndGroupConsumeRT.closed = true
-       topicAndGroupConsumeFailedTPS.closed = true
-       topicAndGroupPullTPS.closed = true
-       topicAndGroupPullRT.closed = true
+       closeOnce.Do(func() {
+               close(topicAndGroupConsumeOKTPS.closed)
+               close(topicAndGroupConsumeRT.closed)
+               close(topicAndGroupConsumeFailedTPS.closed)
+               close(topicAndGroupPullTPS.closed)
+               close(topicAndGroupPullRT.closed)
+       })
 }
 
 func getPullRT(group, topic string) statsSnapshot {
@@ -132,12 +135,13 @@ func getConsumeFailedTPS(group, topic string) 
statsSnapshot {
 type statsItemSet struct {
        statsName      string
        statsItemTable sync.Map
-       closed         bool
+       closed         chan struct{}
 }
 
 func newStatsItemSet(statsName string) *statsItemSet {
        sis := &statsItemSet{
                statsName: statsName,
+               closed:    make(chan struct{}),
        }
        sis.init()
        return sis
@@ -145,47 +149,84 @@ func newStatsItemSet(statsName string) *statsItemSet {
 
 func (sis *statsItemSet) init() {
        go func() {
-               for !sis.closed {
-                       sis.samplingInSeconds()
-                       time.Sleep(10 * time.Second)
+               ticker := time.NewTicker(10 * time.Second)
+               defer ticker.Stop()
+               for {
+                       select {
+                       case <-sis.closed:
+                               return
+                       case <-ticker.C:
+                               sis.samplingInSeconds()
+
+                       }
                }
        }()
 
        go func() {
-               for !sis.closed {
-                       sis.samplingInMinutes()
-                       time.Sleep(10 * time.Minute)
+               ticker := time.NewTicker(10 * time.Minute)
+               defer ticker.Stop()
+               for {
+                       select {
+                       case <-sis.closed:
+                               return
+                       case <-ticker.C:
+                               sis.samplingInMinutes()
+                       }
                }
        }()
 
        go func() {
-               for !sis.closed {
-                       sis.samplingInHour()
-                       time.Sleep(time.Hour)
+               ticker := time.NewTicker(time.Hour)
+               defer ticker.Stop()
+               for {
+                       select {
+                       case <-sis.closed:
+                               return
+                       case <-ticker.C:
+                               sis.samplingInHour()
+                       }
                }
        }()
 
        go func() {
                time.Sleep(nextMinutesTime().Sub(time.Now()))
-               for !sis.closed {
-                       sis.printAtMinutes()
-                       time.Sleep(time.Minute)
+               ticker := time.NewTicker(time.Minute)
+               defer ticker.Stop()
+               for {
+                       select {
+                       case <-sis.closed:
+                               return
+                       case <-ticker.C:
+                               sis.printAtMinutes()
+                       }
                }
        }()
 
        go func() {
                time.Sleep(nextHourTime().Sub(time.Now()))
-               for !sis.closed {
-                       sis.printAtHour()
-                       time.Sleep(time.Hour)
+               ticker := time.NewTicker(time.Hour)
+               defer ticker.Stop()
+               for {
+                       select {
+                       case <-sis.closed:
+                               return
+                       case <-ticker.C:
+                               sis.printAtHour()
+                       }
                }
        }()
 
        go func() {
                time.Sleep(nextMonthTime().Sub(time.Now()))
-               for !sis.closed {
-                       sis.printAtDay()
-                       time.Sleep(24 * time.Hour)
+               ticker := time.NewTicker(24 * time.Hour)
+               defer ticker.Stop()
+               for {
+                       select {
+                       case <-sis.closed:
+                               return
+                       case <-ticker.C:
+                               sis.printAtDay()
+                       }
                }
        }()
 }

Reply via email to