This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new a7b2db6  send csListLock *sync.Mutex to computeStatsData for lock 
csList (#820)
a7b2db6 is described below

commit a7b2db6b743cddf70d3d11620e7e7f6a89aa84ea
Author: liumiaomiao <[email protected]>
AuthorDate: Thu Jul 21 17:29:59 2022 +0800

    send csListLock *sync.Mutex to computeStatsData for lock csList (#820)
---
 consumer/statistics.go      | 16 +++++++---------
 consumer/statistics_test.go | 14 ++++++++++++++
 2 files changed, 21 insertions(+), 9 deletions(-)

diff --git a/consumer/statistics.go b/consumer/statistics.go
index e9d5d79..607c6ee 100644
--- a/consumer/statistics.go
+++ b/consumer/statistics.go
@@ -134,9 +134,7 @@ func (mgr *StatsManager) getConsumeFailedTPS(group, topic 
string) statsSnapshot
        return mgr.topicAndGroupConsumeFailedTPS.getStatsDataInMinute(topic + 
"@" + group)
 }
 
-var csListLock sync.Mutex
-
-func computeStatsData(csList *list.List) statsSnapshot {
+func computeStatsData(csListLock *sync.Mutex, csList *list.List) statsSnapshot 
{
        csListLock.Lock()
        defer csListLock.Unlock()
        tps, avgpt, sum := 0.0, 0.0, int64(0)
@@ -362,15 +360,15 @@ type statsItem struct {
 }
 
 func (si *statsItem) getStatsDataInMinute() statsSnapshot {
-       return computeStatsData(si.csListMinute)
+       return computeStatsData(&si.csListMinuteLock, si.csListMinute)
 }
 
 func (si *statsItem) getStatsDataInHour() statsSnapshot {
-       return computeStatsData(si.csListHour)
+       return computeStatsData(&si.csListHourLock, si.csListHour)
 }
 
 func (si *statsItem) getStatsDataInDay() statsSnapshot {
-       return computeStatsData(si.csListDay)
+       return computeStatsData(&si.csListDayLock, si.csListDay)
 }
 
 func newStatsItem(statsName, statsKey string) *statsItem {
@@ -423,7 +421,7 @@ func (si *statsItem) samplingInHour() {
 }
 
 func (si *statsItem) printAtMinutes() {
-       ss := computeStatsData(si.csListMinute)
+       ss := computeStatsData(&si.csListMinuteLock, si.csListMinute)
        rlog.Info("Stats In One Minute, SUM: %d TPS:  AVGPT: %.2f", 
map[string]interface{}{
                "statsName": si.statsName,
                "statsKey":  si.statsKey,
@@ -434,7 +432,7 @@ func (si *statsItem) printAtMinutes() {
 }
 
 func (si *statsItem) printAtHour() {
-       ss := computeStatsData(si.csListHour)
+       ss := computeStatsData(&si.csListHourLock, si.csListHour)
        rlog.Info("Stats In One Hour, SUM: %d TPS:  AVGPT: %.2f", 
map[string]interface{}{
                "statsName": si.statsName,
                "statsKey":  si.statsKey,
@@ -445,7 +443,7 @@ func (si *statsItem) printAtHour() {
 }
 
 func (si *statsItem) printAtDay() {
-       ss := computeStatsData(si.csListDay)
+       ss := computeStatsData(&si.csListDayLock, si.csListDay)
        rlog.Info("Stats In One Day, SUM: %d TPS:  AVGPT: %.2f", 
map[string]interface{}{
                "statsName": si.statsName,
                "statsKey":  si.statsKey,
diff --git a/consumer/statistics_test.go b/consumer/statistics_test.go
index 4836141..930f0a3 100644
--- a/consumer/statistics_test.go
+++ b/consumer/statistics_test.go
@@ -212,3 +212,17 @@ func TestGetConsumeStatus(t *testing.T) {
                }
        }
 }
+
+func TestNewStatsManager(t *testing.T) {
+       stats := NewStatsManager()
+
+       st := time.Now()
+       for  {
+               stats.increasePullTPS("rocketmq", "default", 1)
+               time.Sleep(500*time.Millisecond)
+               if time.Now().Sub(st) > 5*time.Minute {
+                       break
+               }
+       }
+       stats.ShutDownStat()
+}

Reply via email to