This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/native by this push:
     new ae7a2cf  [ISSUE #148] add consume/statistic.go unit test (#149)
ae7a2cf is described below

commit ae7a2cf084c9fc4aabfca50fcfc24a52083b52f9
Author: 高峰 <[email protected]>
AuthorDate: Wed Aug 14 14:41:58 2019 +0800

    [ISSUE #148] add consume/statistic.go unit test (#149)
---
 consumer/statistics.go      |   4 +-
 consumer/statistics_test.go | 192 ++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 194 insertions(+), 2 deletions(-)

diff --git a/consumer/statistics.go b/consumer/statistics.go
index b85e056..43f547a 100644
--- a/consumer/statistics.go
+++ b/consumer/statistics.go
@@ -38,8 +38,8 @@ var (
 
 func init() {
        topicAndGroupConsumeOKTPS = newStatsItemSet("CONSUME_OK_TPS")
-       topicAndGroupConsumeRT = newStatsItemSet("CONSUME_FAILED_TPS")
-       topicAndGroupConsumeFailedTPS = newStatsItemSet("CONSUME_RT")
+       topicAndGroupConsumeRT = newStatsItemSet("CONSUME_RT")
+       topicAndGroupConsumeFailedTPS = newStatsItemSet("CONSUME_FAILED_TPS")
        topicAndGroupPullTPS = newStatsItemSet("PULL_TPS")
        topicAndGroupPullRT = newStatsItemSet("PULL_RT")
 }
diff --git a/consumer/statistics_test.go b/consumer/statistics_test.go
new file mode 100644
index 0000000..370940f
--- /dev/null
+++ b/consumer/statistics_test.go
@@ -0,0 +1,192 @@
+package consumer
+
+import (
+       "testing"
+       "time"
+)
+
+func almostEqual(a, b float64) bool {
+       diff := abs(a - b)
+       return diff/a < 0.01
+}
+
+func abs(a float64) float64 {
+       if a > 0 {
+               return a
+       }
+       return -a
+}
+
+func TestNextMinuteTime(t *testing.T) {
+       nextMinute := nextMinutesTime()
+       minuteElapse := nextMinute.Sub(time.Now()).Minutes()
+       if !almostEqual(minuteElapse, 1.0) {
+               t.Errorf("wrong next one minute. want=%f, got=%f", 1.0, 
minuteElapse)
+       }
+}
+
+func TestNextHourTime(t *testing.T) {
+       nextHour := nextHourTime()
+       hourElapse := nextHour.Sub(time.Now()).Hours()
+       if !almostEqual(hourElapse, 1.0) {
+               t.Errorf("wrong next one hour. want=%f, got=%f", 1.0, 
hourElapse)
+       }
+}
+
+func TestIncreasePullRTGetPullRT(t *testing.T) {
+       ShutDownStatis()
+
+       tests := []struct {
+               RT        int64
+               ExpectSum int64
+       }{
+               {1, 0},
+               {1, 1},
+               {1, 2},
+               {1, 3},
+               {1, 4},
+               {1, 5},
+               {1, 6},
+               {1, 6},
+       }
+       for _, tt := range tests {
+               increasePullRT("rocketmq", "default", tt.RT)
+               topicAndGroupPullRT.samplingInSeconds()
+               snapshot := getPullRT("rocketmq", "default")
+               if snapshot.sum != tt.ExpectSum {
+                       t.Errorf("wrong Pull RT sum. want=%d, got=%d", 
tt.ExpectSum, snapshot.sum)
+               }
+       }
+}
+
+//func TestIncreaseConsumeRTGetConsumeRT(t *testing.T) {
+//     ShutDownStatis()
+//     tests := []struct {
+//             RT        int64
+//             ExpectSum int64
+//     }{
+//             {1, 0},
+//             {1, 1},
+//             {1, 2},
+//             {1, 3},
+//             {1, 4},
+//             {1, 5},
+//             {1, 6},
+//             {1, 6},
+//     }
+//     for _, tt := range tests {
+//             increaseConsumeRT("rocketmq", "default", tt.RT)
+//             topicAndGroupConsumeRT.samplingInMinutes()
+//             snapshot := getConsumeRT("rocketmq", "default")
+//             if snapshot.sum != tt.ExpectSum {
+//                     t.Errorf("wrong consume RT sum. want=%d, got=%d", 
tt.ExpectSum, snapshot.sum)
+//             }
+//     }
+//}
+
+func TestIncreasePullTPSGetPullTPS(t *testing.T) {
+       ShutDownStatis()
+       tests := []struct {
+               RT        int
+               ExpectSum int64
+       }{
+               {1, 0},
+               {1, 1},
+               {1, 2},
+               {1, 3},
+               {1, 4},
+               {1, 5},
+               {1, 6},
+               {1, 6},
+       }
+       for _, tt := range tests {
+               increasePullTPS("rocketmq", "default", tt.RT)
+               topicAndGroupPullTPS.samplingInSeconds()
+               snapshot := getPullTPS("rocketmq", "default")
+               if snapshot.sum != tt.ExpectSum {
+                       t.Errorf("wrong Pull TPS sum. want=%d, got=%d", 
tt.ExpectSum, snapshot.sum)
+               }
+       }
+}
+
+func TestIncreaseConsumeOKTPSGetConsumeOKTPS(t *testing.T) {
+       ShutDownStatis()
+       tests := []struct {
+               RT        int
+               ExpectSum int64
+       }{
+               {1, 0},
+               {1, 1},
+               {1, 2},
+               {1, 3},
+               {1, 4},
+               {1, 5},
+               {1, 6},
+               {1, 6},
+       }
+       for _, tt := range tests {
+               increaseConsumeOKTPS("rocketmq", "default", tt.RT)
+               topicAndGroupConsumeOKTPS.samplingInSeconds()
+               snapshot := getConsumeOKTPS("rocketmq", "default")
+               if snapshot.sum != tt.ExpectSum {
+                       t.Errorf("wrong Consume OK TPS sum. want=%d, got=%d", 
tt.ExpectSum, snapshot.sum)
+               }
+       }
+}
+
+func TestIncreaseConsumeFailedTPSGetConsumeFailedTPS(t *testing.T) {
+       ShutDownStatis()
+       tests := []struct {
+               RT        int
+               ExpectSum int64
+       }{
+               {1, 0},
+               {1, 1},
+               {1, 2},
+               {1, 3},
+               {1, 4},
+               {1, 5},
+               {1, 6},
+               {1, 6},
+       }
+       for _, tt := range tests {
+               increaseConsumeFailedTPS("rocketmq", "default", tt.RT)
+               topicAndGroupConsumeFailedTPS.samplingInSeconds()
+               snapshot := getConsumeFailedTPS("rocketmq", "default")
+               if snapshot.sum != tt.ExpectSum {
+                       t.Errorf("wrong Consume Failed TPS sum. want=%d, 
got=%d", tt.ExpectSum, snapshot.sum)
+               }
+       }
+}
+
+func TestGetConsumeStatus(t *testing.T) {
+       ShutDownStatis()
+       group, topic := "rocketmq", "default"
+
+       tests := []struct {
+               RT                int
+               ExpectFailMessage int64
+       }{
+               {1, 0},
+               {1, 1},
+               {1, 2},
+               {1, 3},
+               {1, 4},
+       }
+       for _, tt := range tests {
+               increasePullRT(group, topic, int64(tt.RT))
+               increasePullTPS(group, topic, tt.RT)
+               increaseConsumeRT(group, topic, int64(tt.RT))
+               increaseConsumeOKTPS(group, topic, tt.RT)
+               increaseConsumeFailedTPS(group, topic, tt.RT)
+               topicAndGroupPullRT.samplingInSeconds()
+               topicAndGroupPullTPS.samplingInSeconds()
+               topicAndGroupConsumeRT.samplingInMinutes()
+               topicAndGroupConsumeOKTPS.samplingInSeconds()
+               topicAndGroupConsumeFailedTPS.samplingInMinutes()
+               status := GetConsumeStatus(group, topic)
+               if status.ConsumeFailedMsgs != tt.ExpectFailMessage {
+                       t.Errorf("wrong ConsumeFailedMsg. want=0, got=%d", 
status.ConsumeFailedMsgs)
+               }
+       }
+}

Reply via email to