This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new a7b2db6 send csListLock *sync.Mutex to computeStatsData for lock
csList (#820)
a7b2db6 is described below
commit a7b2db6b743cddf70d3d11620e7e7f6a89aa84ea
Author: liumiaomiao <[email protected]>
AuthorDate: Thu Jul 21 17:29:59 2022 +0800
send csListLock *sync.Mutex to computeStatsData for lock csList (#820)
---
consumer/statistics.go | 16 +++++++---------
consumer/statistics_test.go | 14 ++++++++++++++
2 files changed, 21 insertions(+), 9 deletions(-)
diff --git a/consumer/statistics.go b/consumer/statistics.go
index e9d5d79..607c6ee 100644
--- a/consumer/statistics.go
+++ b/consumer/statistics.go
@@ -134,9 +134,7 @@ func (mgr *StatsManager) getConsumeFailedTPS(group, topic
string) statsSnapshot
return mgr.topicAndGroupConsumeFailedTPS.getStatsDataInMinute(topic +
"@" + group)
}
-var csListLock sync.Mutex
-
-func computeStatsData(csList *list.List) statsSnapshot {
+func computeStatsData(csListLock *sync.Mutex, csList *list.List) statsSnapshot
{
csListLock.Lock()
defer csListLock.Unlock()
tps, avgpt, sum := 0.0, 0.0, int64(0)
@@ -362,15 +360,15 @@ type statsItem struct {
}
func (si *statsItem) getStatsDataInMinute() statsSnapshot {
- return computeStatsData(si.csListMinute)
+ return computeStatsData(&si.csListMinuteLock, si.csListMinute)
}
func (si *statsItem) getStatsDataInHour() statsSnapshot {
- return computeStatsData(si.csListHour)
+ return computeStatsData(&si.csListHourLock, si.csListHour)
}
func (si *statsItem) getStatsDataInDay() statsSnapshot {
- return computeStatsData(si.csListDay)
+ return computeStatsData(&si.csListDayLock, si.csListDay)
}
func newStatsItem(statsName, statsKey string) *statsItem {
@@ -423,7 +421,7 @@ func (si *statsItem) samplingInHour() {
}
func (si *statsItem) printAtMinutes() {
- ss := computeStatsData(si.csListMinute)
+ ss := computeStatsData(&si.csListMinuteLock, si.csListMinute)
rlog.Info("Stats In One Minute, SUM: %d TPS: AVGPT: %.2f",
map[string]interface{}{
"statsName": si.statsName,
"statsKey": si.statsKey,
@@ -434,7 +432,7 @@ func (si *statsItem) printAtMinutes() {
}
func (si *statsItem) printAtHour() {
- ss := computeStatsData(si.csListHour)
+ ss := computeStatsData(&si.csListHourLock, si.csListHour)
rlog.Info("Stats In One Hour, SUM: %d TPS: AVGPT: %.2f",
map[string]interface{}{
"statsName": si.statsName,
"statsKey": si.statsKey,
@@ -445,7 +443,7 @@ func (si *statsItem) printAtHour() {
}
func (si *statsItem) printAtDay() {
- ss := computeStatsData(si.csListDay)
+ ss := computeStatsData(&si.csListDayLock, si.csListDay)
rlog.Info("Stats In One Day, SUM: %d TPS: AVGPT: %.2f",
map[string]interface{}{
"statsName": si.statsName,
"statsKey": si.statsKey,
diff --git a/consumer/statistics_test.go b/consumer/statistics_test.go
index 4836141..930f0a3 100644
--- a/consumer/statistics_test.go
+++ b/consumer/statistics_test.go
@@ -212,3 +212,17 @@ func TestGetConsumeStatus(t *testing.T) {
}
}
}
+
+func TestNewStatsManager(t *testing.T) {
+ stats := NewStatsManager()
+
+ st := time.Now()
+ for {
+ stats.increasePullTPS("rocketmq", "default", 1)
+ time.Sleep(500*time.Millisecond)
+ if time.Now().Sub(st) > 5*time.Minute {
+ break
+ }
+ }
+ stats.ShutDownStat()
+}