This is an automated email from the ASF dual-hosted git repository.

aahmed pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 163fba0  Make starting the pprof server optional. (#91)
163fba0 is described below

commit 163fba0305449f002409d6c0617abd2cec124234
Author: cckellogg <[email protected]>
AuthorDate: Mon Nov 11 20:14:11 2019 -0800

    Make starting the pprof server optional. (#91)
    
    * Make starting the pprof server optional.
    
    * Add debug option for log level.
---
 perf/perf-consumer.go  |  72 +++++++++++++++++------------------
 perf/perf-producer.go  |  63 ++++++++++++++++++------------
 perf/pulsar-perf-go.go | 101 +++++++++++++++++++++++++++++++++++++++----------
 3 files changed, 157 insertions(+), 79 deletions(-)

diff --git a/perf/perf-consumer.go b/perf/perf-consumer.go
index a5f98f1..8fe2a86 100644
--- a/perf/perf-consumer.go
+++ b/perf/perf-consumer.go
@@ -18,15 +18,15 @@
 package main
 
 import (
-       "context"
        "encoding/json"
        "sync/atomic"
        "time"
 
-       "github.com/apache/pulsar-client-go/pulsar"
        "github.com/spf13/cobra"
 
        log "github.com/sirupsen/logrus"
+
+       "github.com/apache/pulsar-client-go/pulsar"
 )
 
 // ConsumeArgs define the parameters required by consume
@@ -36,24 +36,30 @@ type ConsumeArgs struct {
        ReceiverQueueSize int
 }
 
-var consumeArgs ConsumeArgs
+func newConsumerCommand() *cobra.Command {
+       consumeArgs := ConsumeArgs{}
+       cmd := &cobra.Command{
+               Use:   "consume <topic>",
+               Short: "Consume from topic",
+               Args:  cobra.ExactArgs(1),
+               Run: func(cmd *cobra.Command, args []string) {
+                       stop := stopCh()
+                       if FlagProfile {
+                               RunProfiling(stop)
+                       }
+                       consumeArgs.Topic = args[0]
+                       consume(&consumeArgs, stop)
+               },
+       }
 
-var cmdConsume = &cobra.Command{
-       Use:   "consume <topic>",
-       Short: "Consume from topic",
-       Args:  cobra.ExactArgs(1),
-       Run: func(cmd *cobra.Command, args []string) {
-               consumeArgs.Topic = args[0]
-               consume()
-       },
-}
+       flags := cmd.Flags()
+       flags.StringVarP(&consumeArgs.SubscriptionName, "subscription", "s", 
"sub", "Subscription name")
+       flags.IntVarP(&consumeArgs.ReceiverQueueSize, "receiver-queue-size", 
"r", 1000, "Receiver queue size")
 
-func initConsumer() {
-       cmdConsume.Flags().StringVarP(&consumeArgs.SubscriptionName, 
"subscription", "s", "sub", "Subscription name")
-       cmdConsume.Flags().IntVarP(&consumeArgs.ReceiverQueueSize, 
"receiver-queue-size", "r", 1000, "Receiver queue size")
+       return cmd
 }
 
-func consume() {
+func consume(consumeArgs *ConsumeArgs, stop <-chan struct{}) {
        b, _ := json.MarshalIndent(clientArgs, "", "  ")
        log.Info("Client config: ", string(b))
        b, _ = json.MarshalIndent(consumeArgs, "", "  ")
@@ -80,32 +86,24 @@ func consume() {
 
        defer consumer.Close()
 
-       ctx := context.Background()
-
-       var msgReceived int64
-       var bytesReceived int64
-
-       go func() {
-               for {
-                       msg, err := consumer.Receive(ctx)
-                       if err != nil {
-                               return
-                       }
-
-                       atomic.AddInt64(&msgReceived, 1)
-                       atomic.AddInt64(&bytesReceived, 
int64(len(msg.Payload())))
-
-                       if err := consumer.Ack(msg); err != nil {
-                               return
-                       }
-               }
-       }()
+       // keep message stats
+       msgReceived := int64(0)
+       bytesReceived := int64(0)
 
        // Print stats of the consume rate
        tick := time.NewTicker(10 * time.Second)
 
        for {
                select {
+               case cm, ok := <-consumer.Chan():
+                       if !ok {
+                               return
+                       }
+                       msgReceived++
+                       bytesReceived += int64(len(cm.Message.Payload()))
+                       if err := consumer.Ack(cm.Message); err != nil {
+                               return
+                       }
                case <-tick.C:
                        currentMsgReceived := atomic.SwapInt64(&msgReceived, 0)
                        currentBytesReceived := 
atomic.SwapInt64(&bytesReceived, 0)
@@ -114,6 +112,8 @@ func consume() {
 
                        log.Infof(`Stats - Consume rate: %6.1f msg/s - %6.1f 
Mbps`,
                                msgRate, bytesRate*8/1024/1024)
+               case <-stop:
+                       return
                }
        }
 }
diff --git a/perf/perf-producer.go b/perf/perf-producer.go
index 2728d6e..93663c9 100644
--- a/perf/perf-producer.go
+++ b/perf/perf-producer.go
@@ -22,12 +22,13 @@ import (
        "encoding/json"
        "time"
 
-       "github.com/apache/pulsar-client-go/pulsar"
        "github.com/beefsack/go-rate"
        "github.com/bmizerany/perks/quantile"
        "github.com/spf13/cobra"
 
        log "github.com/sirupsen/logrus"
+
+       "github.com/apache/pulsar-client-go/pulsar"
 )
 
 // ProduceArgs define the parameters required by produce
@@ -39,39 +40,46 @@ type ProduceArgs struct {
        ProducerQueueSize  int
 }
 
-var produceArgs ProduceArgs
-
-var cmdProduce = &cobra.Command{
-       Use:   "produce ",
-       Short: "Produce on a topic and measure performance",
-       Args:  cobra.ExactArgs(1),
-       Run: func(cmd *cobra.Command, args []string) {
-               produceArgs.Topic = args[0]
-               produce()
-       },
-}
+func newProducerCommand() *cobra.Command {
+       produceArgs := ProduceArgs{}
+       cmd := &cobra.Command{
+               Use:   "produce ",
+               Short: "Produce on a topic and measure performance",
+               Args:  cobra.ExactArgs(1),
+               Run: func(cmd *cobra.Command, args []string) {
+                       stop := stopCh()
+                       if FlagProfile {
+                               RunProfiling(stop)
+                       }
+                       produceArgs.Topic = args[0]
+                       produce(&produceArgs, stop)
+               },
+       }
 
-func initProducer() {
-       cmdProduce.Flags().IntVarP(&produceArgs.Rate, "rate", "r", 100, 
"Publish rate. Set to 0 to go unthrottled")
-       cmdProduce.Flags().IntVarP(&produceArgs.BatchingTimeMillis, 
"batching-time", "b", 1, "Batching grouping time in millis")
-       cmdProduce.Flags().IntVarP(&produceArgs.MessageSize, "size", "s", 1024, 
"Message size")
-       cmdProduce.Flags().IntVarP(&produceArgs.ProducerQueueSize, 
"queue-size", "q", 1000, "Produce queue size")
+       // add flags
+       flags := cmd.Flags()
+       flags.IntVarP(&produceArgs.Rate, "rate", "r", 100,
+               "Publish rate. Set to 0 to go unthrottled")
+       flags.IntVarP(&produceArgs.BatchingTimeMillis, "batching-time", "b", 1,
+               "Batching grouping time in millis")
+       flags.IntVarP(&produceArgs.MessageSize, "size", "s", 1024,
+               "Message size")
+       flags.IntVarP(&produceArgs.ProducerQueueSize, "queue-size", "q", 1000,
+               "Produce queue size")
+
+       return cmd
 }
 
-func produce() {
+func produce(produceArgs *ProduceArgs, stop <-chan struct{}) {
        b, _ := json.MarshalIndent(clientArgs, "", "  ")
        log.Info("Client config: ", string(b))
        b, _ = json.MarshalIndent(produceArgs, "", "  ")
        log.Info("Producer config: ", string(b))
 
-       client, err := pulsar.NewClient(pulsar.ClientOptions{
-               URL: clientArgs.ServiceURL,
-       })
-
+       client, err := NewClient()
        if err != nil {
                log.Fatal(err)
        }
-
        defer client.Close()
 
        producer, err := client.CreateProducer(pulsar.ProducerOptions{
@@ -84,7 +92,6 @@ func produce() {
        if err != nil {
                log.Fatal(err)
        }
-
        defer producer.Close()
 
        ctx := context.Background()
@@ -100,6 +107,12 @@ func produce() {
                }
 
                for {
+                       select {
+                       case <-stop:
+                               return
+                       default:
+                       }
+
                        if rateLimiter != nil {
                                rateLimiter.Wait()
                        }
@@ -126,6 +139,8 @@ func produce() {
 
        for {
                select {
+               case <-stop:
+                       return
                case <-tick.C:
                        messageRate := float64(messagesPublished) / float64(10)
                        log.Infof(`Stats - Publish rate: %6.1f msg/s - %6.1f 
Mbps - Latency ms: 50%% %5.1f - 95%% %5.1f - 99%% %5.1f - 99.9%% %5.1f - max 
%6.1f`,
diff --git a/perf/pulsar-perf-go.go b/perf/pulsar-perf-go.go
index f831079..e62adf8 100644
--- a/perf/pulsar-perf-go.go
+++ b/perf/pulsar-perf-go.go
@@ -18,50 +18,113 @@
 package main
 
 import (
+       "context"
        "fmt"
-       "net"
        "net/http"
        _ "net/http/pprof"
+       "os"
+       "os/signal"
 
        "github.com/spf13/cobra"
 
        log "github.com/sirupsen/logrus"
+
+       "github.com/apache/pulsar-client-go/pulsar"
 )
 
+// global flags
+var FlagProfile bool
+var flagDebug bool
+
 type ClientArgs struct {
        ServiceURL string
 }
 
 var clientArgs ClientArgs
 
-func main() {
-       // use `go tool pprof http://localhost:3000/debug/pprof/profile` to get 
pprof file(cpu info)
-       // use `go tool pprof http://localhost:3000/debug/pprof/heap` to get 
inuse_space file
-       go func() {
-               listenAddr := net.JoinHostPort("localhost", "3000")
-               fmt.Printf("Profile server listening on %s\n", listenAddr)
-               profileRedirect := http.RedirectHandler("/debug/pprof", 
http.StatusSeeOther)
-               http.Handle("/", profileRedirect)
-               err := fmt.Errorf("%v", http.ListenAndServe(listenAddr, nil))
-               fmt.Println(err.Error())
-       }()
+func NewClient() (pulsar.Client, error) {
+       clientOpts := pulsar.ClientOptions{
+               URL: clientArgs.ServiceURL,
+       }
+       return pulsar.NewClient(clientOpts)
+}
 
+func initLogger(debug bool) {
        log.SetFormatter(&log.TextFormatter{
                FullTimestamp:   true,
                TimestampFormat: "15:04:05.000",
        })
-       log.SetLevel(log.InfoLevel)
+       level := log.InfoLevel
+       if debug {
+               level = log.DebugLevel
+       }
+       log.SetLevel(level)
+}
 
-       initProducer()
-       initConsumer()
+func main() {
+       rootCmd := &cobra.Command{
+               PersistentPreRun: func(cmd *cobra.Command, args []string) {
+                       initLogger(flagDebug)
+               },
+               Use: "pulsar-perf-go",
+       }
 
-       var rootCmd = &cobra.Command{Use: "pulsar-perf-go"}
-       rootCmd.Flags().StringVarP(&clientArgs.ServiceURL, "service-url", "u",
+       flags := rootCmd.PersistentFlags()
+       flags.BoolVar(&FlagProfile, "profile", false, "enable profiling")
+       flags.BoolVar(&flagDebug, "debug", false, "enable debug output")
+       flags.StringVarP(&clientArgs.ServiceURL, "service-url", "u",
                "pulsar://localhost:6650", "The Pulsar service URL")
-       rootCmd.AddCommand(cmdProduce, cmdConsume)
+
+       rootCmd.AddCommand(newProducerCommand())
+       rootCmd.AddCommand(newConsumerCommand())
 
        err := rootCmd.Execute()
        if err != nil {
-               panic("execute root cmd error, please check.")
+               fmt.Fprintf(os.Stderr, "executing command error=%+v\n", err)
+               os.Exit(1)
        }
 }
+
+func stopCh() <-chan struct{} {
+       stop := make(chan struct{})
+       signalCh := make(chan os.Signal, 1)
+       signal.Notify(signalCh, os.Interrupt)
+       go func() {
+               for {
+                       select {
+                       case <-signalCh:
+                               close(stop)
+                       }
+               }
+       }()
+       return stop
+}
+
+func RunProfiling(stop <-chan struct{}) {
+       go func() {
+               if err := serveProfiling("0.0.0.0:6060", stop); err != nil && 
err != http.ErrServerClosed {
+                       log.WithError(err).Error("Unable to start debug 
profiling server")
+               }
+       }()
+}
+
+// use `go tool pprof http://addr/debug/pprof/profile` to get pprof file(cpu 
info)
+// use `go tool pprof http://addr/debug/pprof/heap` to get inuse_space file
+func serveProfiling(addr string, stop <-chan struct{}) error {
+       s := http.Server{
+               Addr:    addr,
+               Handler: http.DefaultServeMux,
+       }
+       go func() {
+               <-stop
+               log.Infof("Shutting down pprof server")
+               s.Shutdown(context.Background())
+       }()
+
+       fmt.Printf("Starting pprof server at: %s\n", addr)
+       fmt.Printf("  use `go tool pprof http://%s/debug/pprof/prof` to get 
pprof file(cpu info)\n", addr)
+       fmt.Printf("  use `go tool pprof http://%s/debug/pprof/heap` to get 
inuse_space file\n", addr)
+       fmt.Println()
+
+       return s.ListenAndServe()
+}

Reply via email to