This is an automated email from the ASF dual-hosted git repository.
aahmed pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 163fba0 Make starting the pprof server optional. (#91)
163fba0 is described below
commit 163fba0305449f002409d6c0617abd2cec124234
Author: cckellogg <[email protected]>
AuthorDate: Mon Nov 11 20:14:11 2019 -0800
Make starting the pprof server optional. (#91)
* Make starting the pprof server optional.
* Add debug option for log level.
---
perf/perf-consumer.go | 72 +++++++++++++++++------------------
perf/perf-producer.go | 63 ++++++++++++++++++------------
perf/pulsar-perf-go.go | 101 +++++++++++++++++++++++++++++++++++++++----------
3 files changed, 157 insertions(+), 79 deletions(-)
diff --git a/perf/perf-consumer.go b/perf/perf-consumer.go
index a5f98f1..8fe2a86 100644
--- a/perf/perf-consumer.go
+++ b/perf/perf-consumer.go
@@ -18,15 +18,15 @@
package main
import (
- "context"
"encoding/json"
"sync/atomic"
"time"
- "github.com/apache/pulsar-client-go/pulsar"
"github.com/spf13/cobra"
log "github.com/sirupsen/logrus"
+
+ "github.com/apache/pulsar-client-go/pulsar"
)
// ConsumeArgs define the parameters required by consume
@@ -36,24 +36,30 @@ type ConsumeArgs struct {
ReceiverQueueSize int
}
-var consumeArgs ConsumeArgs
+func newConsumerCommand() *cobra.Command {
+ consumeArgs := ConsumeArgs{}
+ cmd := &cobra.Command{
+ Use: "consume <topic>",
+ Short: "Consume from topic",
+ Args: cobra.ExactArgs(1),
+ Run: func(cmd *cobra.Command, args []string) {
+ stop := stopCh()
+ if FlagProfile {
+ RunProfiling(stop)
+ }
+ consumeArgs.Topic = args[0]
+ consume(&consumeArgs, stop)
+ },
+ }
-var cmdConsume = &cobra.Command{
- Use: "consume <topic>",
- Short: "Consume from topic",
- Args: cobra.ExactArgs(1),
- Run: func(cmd *cobra.Command, args []string) {
- consumeArgs.Topic = args[0]
- consume()
- },
-}
+ flags := cmd.Flags()
+ flags.StringVarP(&consumeArgs.SubscriptionName, "subscription", "s",
"sub", "Subscription name")
+ flags.IntVarP(&consumeArgs.ReceiverQueueSize, "receiver-queue-size",
"r", 1000, "Receiver queue size")
-func initConsumer() {
- cmdConsume.Flags().StringVarP(&consumeArgs.SubscriptionName,
"subscription", "s", "sub", "Subscription name")
- cmdConsume.Flags().IntVarP(&consumeArgs.ReceiverQueueSize,
"receiver-queue-size", "r", 1000, "Receiver queue size")
+ return cmd
}
-func consume() {
+func consume(consumeArgs *ConsumeArgs, stop <-chan struct{}) {
b, _ := json.MarshalIndent(clientArgs, "", " ")
log.Info("Client config: ", string(b))
b, _ = json.MarshalIndent(consumeArgs, "", " ")
@@ -80,32 +86,24 @@ func consume() {
defer consumer.Close()
- ctx := context.Background()
-
- var msgReceived int64
- var bytesReceived int64
-
- go func() {
- for {
- msg, err := consumer.Receive(ctx)
- if err != nil {
- return
- }
-
- atomic.AddInt64(&msgReceived, 1)
- atomic.AddInt64(&bytesReceived,
int64(len(msg.Payload())))
-
- if err := consumer.Ack(msg); err != nil {
- return
- }
- }
- }()
+ // keep message stats
+ msgReceived := int64(0)
+ bytesReceived := int64(0)
// Print stats of the consume rate
tick := time.NewTicker(10 * time.Second)
for {
select {
+ case cm, ok := <-consumer.Chan():
+ if !ok {
+ return
+ }
+ msgReceived++
+ bytesReceived += int64(len(cm.Message.Payload()))
+ if err := consumer.Ack(cm.Message); err != nil {
+ return
+ }
case <-tick.C:
currentMsgReceived := atomic.SwapInt64(&msgReceived, 0)
currentBytesReceived :=
atomic.SwapInt64(&bytesReceived, 0)
@@ -114,6 +112,8 @@ func consume() {
log.Infof(`Stats - Consume rate: %6.1f msg/s - %6.1f
Mbps`,
msgRate, bytesRate*8/1024/1024)
+ case <-stop:
+ return
}
}
}
diff --git a/perf/perf-producer.go b/perf/perf-producer.go
index 2728d6e..93663c9 100644
--- a/perf/perf-producer.go
+++ b/perf/perf-producer.go
@@ -22,12 +22,13 @@ import (
"encoding/json"
"time"
- "github.com/apache/pulsar-client-go/pulsar"
"github.com/beefsack/go-rate"
"github.com/bmizerany/perks/quantile"
"github.com/spf13/cobra"
log "github.com/sirupsen/logrus"
+
+ "github.com/apache/pulsar-client-go/pulsar"
)
// ProduceArgs define the parameters required by produce
@@ -39,39 +40,46 @@ type ProduceArgs struct {
ProducerQueueSize int
}
-var produceArgs ProduceArgs
-
-var cmdProduce = &cobra.Command{
- Use: "produce ",
- Short: "Produce on a topic and measure performance",
- Args: cobra.ExactArgs(1),
- Run: func(cmd *cobra.Command, args []string) {
- produceArgs.Topic = args[0]
- produce()
- },
-}
+func newProducerCommand() *cobra.Command {
+ produceArgs := ProduceArgs{}
+ cmd := &cobra.Command{
+ Use: "produce ",
+ Short: "Produce on a topic and measure performance",
+ Args: cobra.ExactArgs(1),
+ Run: func(cmd *cobra.Command, args []string) {
+ stop := stopCh()
+ if FlagProfile {
+ RunProfiling(stop)
+ }
+ produceArgs.Topic = args[0]
+ produce(&produceArgs, stop)
+ },
+ }
-func initProducer() {
- cmdProduce.Flags().IntVarP(&produceArgs.Rate, "rate", "r", 100,
"Publish rate. Set to 0 to go unthrottled")
- cmdProduce.Flags().IntVarP(&produceArgs.BatchingTimeMillis,
"batching-time", "b", 1, "Batching grouping time in millis")
- cmdProduce.Flags().IntVarP(&produceArgs.MessageSize, "size", "s", 1024,
"Message size")
- cmdProduce.Flags().IntVarP(&produceArgs.ProducerQueueSize,
"queue-size", "q", 1000, "Produce queue size")
+ // add flags
+ flags := cmd.Flags()
+ flags.IntVarP(&produceArgs.Rate, "rate", "r", 100,
+ "Publish rate. Set to 0 to go unthrottled")
+ flags.IntVarP(&produceArgs.BatchingTimeMillis, "batching-time", "b", 1,
+ "Batching grouping time in millis")
+ flags.IntVarP(&produceArgs.MessageSize, "size", "s", 1024,
+ "Message size")
+ flags.IntVarP(&produceArgs.ProducerQueueSize, "queue-size", "q", 1000,
+ "Produce queue size")
+
+ return cmd
}
-func produce() {
+func produce(produceArgs *ProduceArgs, stop <-chan struct{}) {
b, _ := json.MarshalIndent(clientArgs, "", " ")
log.Info("Client config: ", string(b))
b, _ = json.MarshalIndent(produceArgs, "", " ")
log.Info("Producer config: ", string(b))
- client, err := pulsar.NewClient(pulsar.ClientOptions{
- URL: clientArgs.ServiceURL,
- })
-
+ client, err := NewClient()
if err != nil {
log.Fatal(err)
}
-
defer client.Close()
producer, err := client.CreateProducer(pulsar.ProducerOptions{
@@ -84,7 +92,6 @@ func produce() {
if err != nil {
log.Fatal(err)
}
-
defer producer.Close()
ctx := context.Background()
@@ -100,6 +107,12 @@ func produce() {
}
for {
+ select {
+ case <-stop:
+ return
+ default:
+ }
+
if rateLimiter != nil {
rateLimiter.Wait()
}
@@ -126,6 +139,8 @@ func produce() {
for {
select {
+ case <-stop:
+ return
case <-tick.C:
messageRate := float64(messagesPublished) / float64(10)
log.Infof(`Stats - Publish rate: %6.1f msg/s - %6.1f
Mbps - Latency ms: 50%% %5.1f - 95%% %5.1f - 99%% %5.1f - 99.9%% %5.1f - max
%6.1f`,
diff --git a/perf/pulsar-perf-go.go b/perf/pulsar-perf-go.go
index f831079..e62adf8 100644
--- a/perf/pulsar-perf-go.go
+++ b/perf/pulsar-perf-go.go
@@ -18,50 +18,113 @@
package main
import (
+ "context"
"fmt"
- "net"
"net/http"
_ "net/http/pprof"
+ "os"
+ "os/signal"
"github.com/spf13/cobra"
log "github.com/sirupsen/logrus"
+
+ "github.com/apache/pulsar-client-go/pulsar"
)
+// global flags
+var FlagProfile bool
+var flagDebug bool
+
type ClientArgs struct {
ServiceURL string
}
var clientArgs ClientArgs
-func main() {
- // use `go tool pprof http://localhost:3000/debug/pprof/profile` to get
pprof file(cpu info)
- // use `go tool pprof http://localhost:3000/debug/pprof/heap` to get
inuse_space file
- go func() {
- listenAddr := net.JoinHostPort("localhost", "3000")
- fmt.Printf("Profile server listening on %s\n", listenAddr)
- profileRedirect := http.RedirectHandler("/debug/pprof",
http.StatusSeeOther)
- http.Handle("/", profileRedirect)
- err := fmt.Errorf("%v", http.ListenAndServe(listenAddr, nil))
- fmt.Println(err.Error())
- }()
+func NewClient() (pulsar.Client, error) {
+ clientOpts := pulsar.ClientOptions{
+ URL: clientArgs.ServiceURL,
+ }
+ return pulsar.NewClient(clientOpts)
+}
+func initLogger(debug bool) {
log.SetFormatter(&log.TextFormatter{
FullTimestamp: true,
TimestampFormat: "15:04:05.000",
})
- log.SetLevel(log.InfoLevel)
+ level := log.InfoLevel
+ if debug {
+ level = log.DebugLevel
+ }
+ log.SetLevel(level)
+}
- initProducer()
- initConsumer()
+func main() {
+ rootCmd := &cobra.Command{
+ PersistentPreRun: func(cmd *cobra.Command, args []string) {
+ initLogger(flagDebug)
+ },
+ Use: "pulsar-perf-go",
+ }
- var rootCmd = &cobra.Command{Use: "pulsar-perf-go"}
- rootCmd.Flags().StringVarP(&clientArgs.ServiceURL, "service-url", "u",
+ flags := rootCmd.PersistentFlags()
+ flags.BoolVar(&FlagProfile, "profile", false, "enable profiling")
+ flags.BoolVar(&flagDebug, "debug", false, "enable debug output")
+ flags.StringVarP(&clientArgs.ServiceURL, "service-url", "u",
"pulsar://localhost:6650", "The Pulsar service URL")
- rootCmd.AddCommand(cmdProduce, cmdConsume)
+
+ rootCmd.AddCommand(newProducerCommand())
+ rootCmd.AddCommand(newConsumerCommand())
err := rootCmd.Execute()
if err != nil {
- panic("execute root cmd error, please check.")
+ fmt.Fprintf(os.Stderr, "executing command error=%+v\n", err)
+ os.Exit(1)
}
}
+
+func stopCh() <-chan struct{} {
+ stop := make(chan struct{})
+ signalCh := make(chan os.Signal, 1)
+ signal.Notify(signalCh, os.Interrupt)
+ go func() {
+ for {
+ select {
+ case <-signalCh:
+ close(stop)
+ }
+ }
+ }()
+ return stop
+}
+
+func RunProfiling(stop <-chan struct{}) {
+ go func() {
+ if err := serveProfiling("0.0.0.0:6060", stop); err != nil &&
err != http.ErrServerClosed {
+ log.WithError(err).Error("Unable to start debug
profiling server")
+ }
+ }()
+}
+
+// use `go tool pprof http://addr/debug/pprof/profile` to get pprof file(cpu
info)
+// use `go tool pprof http://addr/debug/pprof/heap` to get inuse_space file
+func serveProfiling(addr string, stop <-chan struct{}) error {
+ s := http.Server{
+ Addr: addr,
+ Handler: http.DefaultServeMux,
+ }
+ go func() {
+ <-stop
+ log.Infof("Shutting down pprof server")
+ s.Shutdown(context.Background())
+ }()
+
+ fmt.Printf("Starting pprof server at: %s\n", addr)
+ fmt.Printf(" use `go tool pprof http://%s/debug/pprof/prof` to get
pprof file(cpu info)\n", addr)
+ fmt.Printf(" use `go tool pprof http://%s/debug/pprof/heap` to get
inuse_space file\n", addr)
+ fmt.Println()
+
+ return s.ListenAndServe()
+}