This is an automated email from the ASF dual-hosted git repository.
wenfeng pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/native by this push:
new 2ace48a [ISSUE #356] feat(consumer): redesign stat (#357)
2ace48a is described below
commit 2ace48afd7c054fa6231e28407f7e40f76c84ad6
Author: xujianhai666 <[email protected]>
AuthorDate: Mon Jan 6 10:09:43 2020 +0800
[ISSUE #356] feat(consumer): redesign stat (#357)
* feat(consumer): redesign stat
---
consumer/statistics.go | 89 ++++++++++++++++++++++++++++++++++++--------------
1 file changed, 65 insertions(+), 24 deletions(-)
diff --git a/consumer/statistics.go b/consumer/statistics.go
index 2448c74..a58ff9e 100644
--- a/consumer/statistics.go
+++ b/consumer/statistics.go
@@ -29,6 +29,7 @@ import (
var (
csListLock sync.Mutex
+ closeOnce sync.Once
topicAndGroupConsumeOKTPS *statsItemSet
topicAndGroupConsumeRT *statsItemSet
@@ -98,11 +99,13 @@ func GetConsumeStatus(group, topic string) ConsumeStatus {
}
func ShutDownStatis() {
- topicAndGroupConsumeOKTPS.closed = true
- topicAndGroupConsumeRT.closed = true
- topicAndGroupConsumeFailedTPS.closed = true
- topicAndGroupPullTPS.closed = true
- topicAndGroupPullRT.closed = true
+ closeOnce.Do(func() {
+ close(topicAndGroupConsumeOKTPS.closed)
+ close(topicAndGroupConsumeRT.closed)
+ close(topicAndGroupConsumeFailedTPS.closed)
+ close(topicAndGroupPullTPS.closed)
+ close(topicAndGroupPullRT.closed)
+ })
}
func getPullRT(group, topic string) statsSnapshot {
@@ -132,12 +135,13 @@ func getConsumeFailedTPS(group, topic string)
statsSnapshot {
type statsItemSet struct {
statsName string
statsItemTable sync.Map
- closed bool
+ closed chan struct{}
}
func newStatsItemSet(statsName string) *statsItemSet {
sis := &statsItemSet{
statsName: statsName,
+ closed: make(chan struct{}),
}
sis.init()
return sis
@@ -145,47 +149,84 @@ func newStatsItemSet(statsName string) *statsItemSet {
func (sis *statsItemSet) init() {
go func() {
- for !sis.closed {
- sis.samplingInSeconds()
- time.Sleep(10 * time.Second)
+ ticker := time.NewTicker(10 * time.Second)
+ defer ticker.Stop()
+ for {
+ select {
+ case <-sis.closed:
+ return
+ case <-ticker.C:
+ sis.samplingInSeconds()
+
+ }
}
}()
go func() {
- for !sis.closed {
- sis.samplingInMinutes()
- time.Sleep(10 * time.Minute)
+ ticker := time.NewTicker(10 * time.Minute)
+ defer ticker.Stop()
+ for {
+ select {
+ case <-sis.closed:
+ return
+ case <-ticker.C:
+ sis.samplingInMinutes()
+ }
}
}()
go func() {
- for !sis.closed {
- sis.samplingInHour()
- time.Sleep(time.Hour)
+ ticker := time.NewTicker(time.Hour)
+ defer ticker.Stop()
+ for {
+ select {
+ case <-sis.closed:
+ return
+ case <-ticker.C:
+ sis.samplingInHour()
+ }
}
}()
go func() {
time.Sleep(nextMinutesTime().Sub(time.Now()))
- for !sis.closed {
- sis.printAtMinutes()
- time.Sleep(time.Minute)
+ ticker := time.NewTicker(time.Minute)
+ defer ticker.Stop()
+ for {
+ select {
+ case <-sis.closed:
+ return
+ case <-ticker.C:
+ sis.printAtMinutes()
+ }
}
}()
go func() {
time.Sleep(nextHourTime().Sub(time.Now()))
- for !sis.closed {
- sis.printAtHour()
- time.Sleep(time.Hour)
+ ticker := time.NewTicker(time.Hour)
+ defer ticker.Stop()
+ for {
+ select {
+ case <-sis.closed:
+ return
+ case <-ticker.C:
+ sis.printAtHour()
+ }
}
}()
go func() {
time.Sleep(nextMonthTime().Sub(time.Now()))
- for !sis.closed {
- sis.printAtDay()
- time.Sleep(24 * time.Hour)
+ ticker := time.NewTicker(24 * time.Hour)
+ defer ticker.Stop()
+ for {
+ select {
+ case <-sis.closed:
+ return
+ case <-ticker.C:
+ sis.printAtDay()
+ }
}
}()
}