vongosling closed pull request #17: Producer benchmark
URL: https://github.com/apache/rocketmq-client-go/pull/17
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/benchmark/main.go b/benchmark/main.go
new file mode 100644
index 0000000..d268d72
--- /dev/null
+++ b/benchmark/main.go
@@ -0,0 +1,62 @@
+package main
+
+import (
+       "fmt"
+       "os"
+       "strings"
+)
+
+type command interface {
+       usage()
+       run(args []string)
+}
+
+var (
+       cmds        = map[string]command{}
+       longText    = ""
+       longTextLen = 0
+)
+
+func init() {
+       longText = strings.Repeat("0123456789", 100)
+       longTextLen = len(longText)
+}
+
+func registerCommand(name string, cmd command) {
+       if cmd == nil {
+               panic("empty command")
+       }
+
+       _, ok := cmds[name]
+       if ok {
+               panic(fmt.Sprintf("%s command existed", name))
+       }
+
+       cmds[name] = cmd
+}
+
+func usage() {
+       println(os.Args[0] + " commandName [...]")
+       for _, cmd := range cmds {
+               cmd.usage()
+       }
+}
+
+// go run *.go [command name] [command args]
+func main() {
+       if len(os.Args) < 2 {
+               println("error:lack cmd name\n")
+               usage()
+               return
+       }
+
+       name := os.Args[1]
+       cmd, ok := cmds[name]
+       if !ok {
+               fmt.Printf("command %s is not supported\n", name)
+               usage()
+               return
+       }
+
+       cmd.run(os.Args[2:])
+}
diff --git a/benchmark/producer.go b/benchmark/producer.go
new file mode 100644
index 0000000..b51df0a
--- /dev/null
+++ b/benchmark/producer.go
@@ -0,0 +1,267 @@
+package main
+
+import (
+       "flag"
+       "fmt"
+       "os"
+       "os/signal"
+       "sync"
+       "sync/atomic"
+       "syscall"
+       "time"
+
+       rocketmq "github.com/apache/rocketmq-client-go/core"
+)
+
+type statiBenchmarkProducerSnapshot struct {
+       sendRequestSuccessCount     int64
+       sendRequestFailedCount      int64
+       receiveResponseSuccessCount int64
+       receiveResponseFailedCount  int64
+       sendMessageSuccessTimeTotal int64
+       sendMessageMaxRT            int64
+       createdAt                   time.Time
+       next                        *statiBenchmarkProducerSnapshot
+}
+
+type snapshots struct {
+       sync.RWMutex
+       head, tail, cur *statiBenchmarkProducerSnapshot
+       len             int
+}
+
+func (s *snapshots) takeSnapshot() {
+       b := s.cur
+       sn := new(statiBenchmarkProducerSnapshot)
+       sn.sendRequestSuccessCount = 
atomic.LoadInt64(&b.sendRequestSuccessCount)
+       sn.sendRequestFailedCount = atomic.LoadInt64(&b.sendRequestFailedCount)
+       sn.receiveResponseSuccessCount = 
atomic.LoadInt64(&b.receiveResponseSuccessCount)
+       sn.receiveResponseFailedCount = 
atomic.LoadInt64(&b.receiveResponseFailedCount)
+       sn.sendMessageSuccessTimeTotal = 
atomic.LoadInt64(&b.sendMessageSuccessTimeTotal)
+       sn.sendMessageMaxRT = atomic.LoadInt64(&b.sendMessageMaxRT)
+       sn.createdAt = time.Now()
+
+       s.Lock()
+       if s.tail != nil {
+               s.tail.next = sn
+       }
+       s.tail = sn
+       if s.head == nil {
+               s.head = s.tail
+       }
+
+       s.len++
+       if s.len > 10 {
+               s.head = s.head.next
+               s.len--
+       }
+       s.Unlock()
+}
+
+func (s *snapshots) printStati() {
+       s.RLock()
+       if s.len < 10 {
+               s.RUnlock()
+               return
+       }
+
+       f, l := s.head, s.tail
+       respSucCount := float64(l.receiveResponseSuccessCount - 
f.receiveResponseSuccessCount)
+       sendTps := respSucCount / l.createdAt.Sub(f.createdAt).Seconds()
+       avgRT := 
float64(l.sendMessageSuccessTimeTotal-f.sendMessageSuccessTimeTotal) / 
respSucCount
+       maxRT := atomic.LoadInt64(&s.cur.sendMessageMaxRT)
+       s.RUnlock()
+
+       fmt.Printf(
+               "Send TPS: %d Max RT: %d Average RT: %7.3f Send Failed: %d 
Response Failed: %d Total:%d\n",
+               int64(sendTps), maxRT, avgRT, l.sendRequestFailedCount, 
l.receiveResponseFailedCount, l.receiveResponseSuccessCount,
+       )
+}
+func takeSnapshot(s *snapshots, exit chan struct{}) {
+       ticker := time.NewTicker(time.Second)
+       for {
+               select {
+               case <-ticker.C:
+                       s.takeSnapshot()
+               case <-exit:
+                       ticker.Stop()
+                       return
+               }
+       }
+}
+
+func printStati(s *snapshots, exit chan struct{}) {
+       ticker := time.NewTicker(time.Second * 10)
+       for {
+               select {
+               case <-ticker.C:
+                       s.printStati()
+               case <-exit:
+                       ticker.Stop()
+                       return
+               }
+       }
+}
+
+type producer struct {
+       topic         string
+       nameSrv       string
+       groupID       string
+       instanceCount int
+       testMinutes   int
+       bodySize      int
+
+       flags *flag.FlagSet
+}
+
+func init() {
+       p := &producer{}
+       flags := flag.NewFlagSet("producer", flag.ExitOnError)
+       p.flags = flags
+
+       flags.StringVar(&p.topic, "t", "", "topic name")
+       flags.StringVar(&p.nameSrv, "n", "", "nameserver address")
+       flags.StringVar(&p.groupID, "g", "", "group id")
+       flags.IntVar(&p.instanceCount, "i", 1, "instance count")
+       flags.IntVar(&p.testMinutes, "m", 10, "test minutes")
+       flags.IntVar(&p.bodySize, "s", 32, "body size")
+
+       registerCommand("producer", p)
+}
+
+func (bp *producer) produceMsg(stati *statiBenchmarkProducerSnapshot, exit 
chan struct{}) {
+       p, err := rocketmq.NewProducer(&rocketmq.ProducerConfig{
+               ClientConfig: rocketmq.ClientConfig{GroupID: bp.groupID, 
NameServer: bp.nameSrv},
+       })
+       if err != nil {
+               fmt.Printf("new producer error:%s\n", err)
+               return
+       }
+
+       p.Start()
+       defer p.Shutdown()
+
+       topic, tag := bp.topic, "benchmark-producer"
+
+AGAIN:
+       select {
+       case <-exit:
+               return
+       default:
+       }
+
+       now := time.Now()
+       r := p.SendMessageSync(&rocketmq.Message{
+               Topic: bp.topic, Body: longText[:bp.bodySize],
+       })
+
+       if r.Status == rocketmq.SendOK {
+               atomic.AddInt64(&stati.receiveResponseSuccessCount, 1)
+               atomic.AddInt64(&stati.sendRequestSuccessCount, 1)
+               currentRT := int64(time.Since(now) / time.Millisecond)
+               atomic.AddInt64(&stati.sendMessageSuccessTimeTotal, currentRT)
+               prevRT := atomic.LoadInt64(&stati.sendMessageMaxRT)
+               for currentRT > prevRT {
+                       if atomic.CompareAndSwapInt64(&stati.sendMessageMaxRT, 
prevRT, currentRT) {
+                               break
+                       }
+                       prevRT = atomic.LoadInt64(&stati.sendMessageMaxRT)
+               }
+               goto AGAIN
+       }
+
+       fmt.Printf("%v send message %s:%s error:%s\n", time.Now(), topic, tag, 
err.Error())
+       //if _, ok := err.(*rpc.ErrorInfo); ok { TODO
+       //atomic.AddInt64(&stati.receiveResponseFailedCount, 1)
+       //} else {
+       //atomic.AddInt64(&stati.sendRequestFailedCount, 1)
+       //}
+       goto AGAIN
+}
+
+func (bp *producer) run(args []string) {
+       bp.flags.Parse(args)
+
+       if bp.topic == "" {
+               println("empty topic")
+               bp.flags.Usage()
+               return
+       }
+
+       if bp.groupID == "" {
+               println("empty group id")
+               bp.flags.Usage()
+               return
+       }
+
+       if bp.nameSrv == "" {
+               println("empty namesrv")
+               bp.flags.Usage()
+               return
+       }
+       if bp.instanceCount <= 0 {
+               println("instance count must be positive integer")
+               bp.flags.Usage()
+               return
+       }
+       if bp.testMinutes <= 0 {
+               println("test time must be positive integer")
+               bp.flags.Usage()
+               return
+       }
+       if bp.bodySize <= 0 {
+               println("body size must be positive integer")
+               bp.flags.Usage()
+               return
+       }
+
+       stati := statiBenchmarkProducerSnapshot{}
+       snapshots := snapshots{cur: &stati}
+       exitChan := make(chan struct{})
+       wg := sync.WaitGroup{}
+
+       for i := 0; i < bp.instanceCount; i++ {
+               i := i
+               go func() {
+                       wg.Add(1)
+                       bp.produceMsg(&stati, exitChan)
+                       fmt.Printf("exit of produce %d\n", i)
+                       wg.Done()
+               }()
+       }
+
+       // snapshot
+       go func() {
+               wg.Add(1)
+               takeSnapshot(&snapshots, exitChan)
+               wg.Done()
+       }()
+
+       // print statistic
+       go func() {
+               wg.Add(1)
+               printStati(&snapshots, exitChan)
+               wg.Done()
+       }()
+
+       signalChan := make(chan os.Signal, 1)
+       signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)
+       select {
+       case <-time.Tick(time.Minute * time.Duration(bp.testMinutes)):
+       case <-signalChan:
+       }
+
+       close(exitChan)
+       wg.Wait()
+       snapshots.takeSnapshot()
+       snapshots.printStati()
+       fmt.Println("TEST DONE")
+}
+
+func (bp *producer) usage() {
+       bp.flags.Usage()
+}
+
+func (bp *producer) buildMsg() string {
+       return longText[:bp.bodySize]
+}
diff --git a/core/api.go b/core/api.go
index 58c1465..1ed6c82 100644
--- a/core/api.go
+++ b/core/api.go
@@ -22,7 +22,7 @@ func Version() (version string) {
        return GetVersion()
 }
 
-type clientConfig struct {
+type ClientConfig struct {
        GroupID          string
        NameServer       string
        NameServerDomain string
@@ -32,7 +32,7 @@ type clientConfig struct {
        LogC             *LogConfig
 }
 
-func (config *clientConfig) string() string {
+func (config *ClientConfig) string() string {
        // For security, don't print Credentials.
        str := ""
        str = strJoin(str, "GroupId", config.GroupID)
@@ -55,14 +55,14 @@ func NewProducer(config *ProducerConfig) (Producer, error) {
 
 // ProducerConfig define a producer
 type ProducerConfig struct {
-       clientConfig
+       ClientConfig
        SendMsgTimeout int
        CompressLevel  int
        MaxMessageSize int
 }
 
 func (config *ProducerConfig) String() string {
-       str := "ProducerConfig=[" + config.clientConfig.string()
+       str := "ProducerConfig=[" + config.ClientConfig.string()
 
        if config.SendMsgTimeout > 0 {
                str = strJoin(str, "SendMsgTimeout", config.SendMsgTimeout)
@@ -116,7 +116,7 @@ func (mode MessageModel) String() string {
 
 // PushConsumerConfig define a new consumer.
 type PushConsumerConfig struct {
-       clientConfig
+       ClientConfig
        ThreadCount         int
        MessageBatchMaxSize int
        Model               MessageModel
@@ -124,7 +124,7 @@ type PushConsumerConfig struct {
 
 func (config *PushConsumerConfig) String() string {
        // For security, don't print Credentials.
-       str := "PushConsumerConfig=[" + config.clientConfig.string()
+       str := "PushConsumerConfig=[" + config.ClientConfig.string()
 
        if config.ThreadCount > 0 {
                str = strJoin(str, "ThreadCount", config.ThreadCount)
diff --git a/core/pull_consumer.go b/core/pull_consumer.go
index 6dcac43..fdf8d76 100644
--- a/core/pull_consumer.go
+++ b/core/pull_consumer.go
@@ -65,7 +65,7 @@ func (ps PullStatus) String() string {
 
 // PullConsumerConfig the configuration for the pull consumer
 type PullConsumerConfig struct {
-       clientConfig
+       ClientConfig
 }
 
 // DefaultPullConsumer default consumer pulling the message


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to