This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 220dcb4 Export client metrics to Prometheus (#317)
220dcb4 is described below
commit 220dcb40d531c16e59d323d420ee8f00492001dd
Author: Matteo Merli <[email protected]>
AuthorDate: Fri Jul 10 12:11:30 2020 -0700
Export client metrics to Prometheus (#317)
* Export client metrics to Prometheus
* Added missing messagesReceived.Inc()
---
go.mod | 4 +-
go.sum | 84 +++++++++++++++++++++++++++++++++++++--
perf/pulsar-perf-go.go | 12 ++++++
pulsar/consumer_impl.go | 29 +++++++++++++-
pulsar/consumer_multitopic.go | 2 +
pulsar/consumer_partition.go | 60 ++++++++++++++++++++++++++++
pulsar/consumer_regex.go | 2 +
pulsar/impl_message.go | 6 ++-
pulsar/internal/connection.go | 30 ++++++++++++++
pulsar/internal/lookup_service.go | 11 +++++
pulsar/internal/rpc_client.go | 13 ++++++
pulsar/producer_impl.go | 24 +++++++++++
pulsar/producer_partition.go | 55 ++++++++++++++++++++++++-
pulsar/reader_impl.go | 17 ++++++++
14 files changed, 340 insertions(+), 9 deletions(-)
diff --git a/go.mod b/go.mod
index f26abd0..afae8cc 100644
--- a/go.mod
+++ b/go.mod
@@ -9,9 +9,11 @@ require (
github.com/gogo/protobuf v1.3.1
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/klauspost/compress v1.10.8
+ github.com/kr/pretty v0.2.0 // indirect
github.com/pierrec/lz4 v2.0.5+incompatible
github.com/pkg/errors v0.8.1
- github.com/sirupsen/logrus v1.4.1
+ github.com/prometheus/client_golang v1.7.1
+ github.com/sirupsen/logrus v1.4.2
github.com/spaolacci/murmur3 v1.1.0
github.com/spf13/cobra v0.0.3
github.com/spf13/pflag v1.0.3 // indirect
diff --git a/go.sum b/go.sum
index 302cf5b..9f51dbb 100644
--- a/go.sum
+++ b/go.sum
@@ -1,46 +1,106 @@
github.com/BurntSushi/toml v0.3.1/go.mod
h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/DataDog/zstd v1.4.6-0.20200617134701-89f69fb7df32
h1:/gZKpgSMydtrih81nvUhlkXpZIUfthKShSCVbRzBt9Y=
github.com/DataDog/zstd v1.4.6-0.20200617134701-89f69fb7df32/go.mod
h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo=
+github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod
h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
+github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod
h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
+github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod
h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
+github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod
h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/ardielle/ardielle-go v1.5.2
h1:TilHTpHIQJ27R1Tl/iITBzMwiUGSlVfiVhwDNGM3Zj4=
github.com/ardielle/ardielle-go v1.5.2/go.mod
h1:I4hy1n795cUhaVt/ojz83SNVCYIGsAFAONtv2Dr7HUI=
github.com/ardielle/ardielle-tools v1.5.4/go.mod
h1:oZN+JRMnqGiIhrzkRN9l26Cej9dEx4jeNG6A+AdkShk=
github.com/beefsack/go-rate v0.0.0-20180408011153-efa7637bb9b6
h1:KXlsf+qt/X5ttPGEjR0tPH1xaWWoKBEg9Q1THAj2h3I=
github.com/beefsack/go-rate v0.0.0-20180408011153-efa7637bb9b6/go.mod
h1:6YNgTHLutezwnBvyneBbwvB8C82y3dcoOj5EQJIdGXA=
+github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod
h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
+github.com/beorn7/perks v1.0.0/go.mod
h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
+github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
+github.com/beorn7/perks v1.0.1/go.mod
h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b
h1:AP/Y7sqYicnjGDfD5VcY4CIfh1hRXBUavxrvELjTiOE=
github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b/go.mod
h1:ac9efd0D1fsDb3EJvhqgXRbFx7bs2wqZ10HQPeU8U/Q=
github.com/boynton/repl v0.0.0-20170116235056-348863958e3e/go.mod
h1:Crc/GCZ3NXDVCio7Yr0o+SSrytpcFhLmVCIzi0s49t4=
+github.com/cespare/xxhash/v2 v2.1.1
h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY=
+github.com/cespare/xxhash/v2 v2.1.1/go.mod
h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/davecgh/go-spew v1.1.0/go.mod
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1
h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dimfeld/httptreemux v5.0.1+incompatible
h1:Qj3gVcDNoOthBAqftuD596rm4wg/adLLz5xh5CmpiCA=
github.com/dimfeld/httptreemux v5.0.1+incompatible/go.mod
h1:rbUlSV+CCpv/SuqUTP/8Bk2O3LyUV436/yaRGkhP6Z0=
+github.com/go-kit/kit v0.8.0/go.mod
h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
+github.com/go-kit/kit v0.9.0/go.mod
h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
+github.com/go-logfmt/logfmt v0.3.0/go.mod
h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE=
+github.com/go-logfmt/logfmt v0.4.0/go.mod
h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
+github.com/go-stack/stack v1.8.0/go.mod
h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
+github.com/gogo/protobuf v1.1.1/go.mod
h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/gogo/protobuf v1.3.1 h1:DqDEcV5aeaTmdFBePNpYsp3FlcVH/2ISVVM9Qf8PSls=
github.com/gogo/protobuf v1.3.1/go.mod
h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o=
+github.com/golang/protobuf v1.2.0/go.mod
h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
+github.com/golang/protobuf v1.3.1/go.mod
h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
+github.com/golang/protobuf v1.3.2/go.mod
h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
+github.com/golang/protobuf v1.4.0-rc.1/go.mod
h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8=
+github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod
h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA=
+github.com/golang/protobuf v1.4.0-rc.2/go.mod
h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs=
+github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod
h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w=
+github.com/golang/protobuf v1.4.0/go.mod
h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0=
+github.com/golang/protobuf v1.4.2
h1:+Z5KGCizgyZCbGh1KZqA0fcLLkwbsjIzS4aV2v7wJX0=
+github.com/golang/protobuf v1.4.2/go.mod
h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
github.com/google/go-cmp v0.3.0/go.mod
h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.3.1/go.mod
h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
+github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4=
github.com/google/go-cmp v0.4.0/go.mod
h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
+github.com/google/gofuzz v1.0.0/go.mod
h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/gorilla/context v1.1.1/go.mod
h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51q0aT7Yg=
github.com/gorilla/mux v1.7.3/go.mod
h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
github.com/inconshreveable/mousetrap v1.0.0
h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM=
github.com/inconshreveable/mousetrap v1.0.0/go.mod
h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/jawher/mow.cli v1.0.4/go.mod
h1:5hQj2V8g+qYmLUVWqu4Wuja1pI57M83EChYLVZ0sMKk=
github.com/jawher/mow.cli v1.1.0/go.mod
h1:aNaQlc7ozF3vw6IJ2dHjp2ZFiA4ozMIYY6PyuRJwlUg=
+github.com/json-iterator/go v1.1.6/go.mod
h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
+github.com/json-iterator/go v1.1.10/go.mod
h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
+github.com/julienschmidt/httprouter v1.2.0/go.mod
h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
github.com/kisielk/errcheck v1.2.0/go.mod
h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00=
github.com/kisielk/gotool v1.0.0/go.mod
h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
-github.com/klauspost/compress v1.10.5
h1:7q6vHIqubShURwQz8cQK6yIe/xC3IF0Vm7TGfqjewrc=
-github.com/klauspost/compress v1.10.5/go.mod
h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
github.com/klauspost/compress v1.10.8
h1:eLeJ3dr/Y9+XRfJT4l+8ZjmtB5RPJhucH2HeCV5+IZY=
github.com/klauspost/compress v1.10.8/go.mod
h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
github.com/konsorten/go-windows-terminal-sequences v1.0.1
h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod
h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
+github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod
h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
+github.com/kr/pretty v0.1.0/go.mod
h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
+github.com/kr/pretty v0.2.0 h1:s5hAObm+yFO5uHYt5dYjxi2rXrsnmRpJx4OYvIWUaQs=
+github.com/kr/pretty v0.2.0/go.mod
h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
+github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
+github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
+github.com/kr/text v0.1.0/go.mod
h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
+github.com/matttproud/golang_protobuf_extensions v1.0.1
h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
+github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod
h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
+github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod
h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
+github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod
h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
+github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod
h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
+github.com/modern-go/reflect2 v1.0.1/go.mod
h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
+github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod
h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/pierrec/lz4 v2.0.5+incompatible
h1:2xWsjqPFWcplujydGg4WmhC/6fZqK42wMM8aXeqhl0I=
github.com/pierrec/lz4 v2.0.5+incompatible/go.mod
h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
+github.com/pkg/errors v0.8.0/go.mod
h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.8.1/go.mod
h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0
h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod
h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
-github.com/sirupsen/logrus v1.4.1
h1:GL2rEmy6nsikmW0r8opw9JIRScdMF5hA8cOYLH7In1k=
-github.com/sirupsen/logrus v1.4.1/go.mod
h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q=
+github.com/prometheus/client_golang v0.9.1/go.mod
h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
+github.com/prometheus/client_golang v1.0.0/go.mod
h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo=
+github.com/prometheus/client_golang v1.7.1
h1:NTGy1Ja9pByO+xAeH/qiWnLrKtr3hJPNjaVUwnjpdpA=
+github.com/prometheus/client_golang v1.7.1/go.mod
h1:PY5Wy2awLA44sXw4AOSfFBetzPP4j5+D6mVACh+pe2M=
+github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod
h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
+github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod
h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
+github.com/prometheus/client_model v0.2.0
h1:uq5h0d+GuxiXLJLNABMgp2qUWDPiLvgCzz2dUR+/W/M=
+github.com/prometheus/client_model v0.2.0/go.mod
h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
+github.com/prometheus/common v0.4.1/go.mod
h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4=
+github.com/prometheus/common v0.10.0
h1:RyRA7RzGXQZiW+tGMr7sxa85G1z0yOpM1qq5c8lNawc=
+github.com/prometheus/common v0.10.0/go.mod
h1:Tlit/dnDKsSWFlCLTWaA1cyBgKHSMdTB80sz/V91rCo=
+github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod
h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
+github.com/prometheus/procfs v0.0.2/go.mod
h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA=
+github.com/prometheus/procfs v0.1.3
h1:F0+tqvhOksq22sc6iCHF5WGlWjdwj92p0udFh1VFBS8=
+github.com/prometheus/procfs v0.1.3/go.mod
h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU=
+github.com/sirupsen/logrus v1.2.0/go.mod
h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
+github.com/sirupsen/logrus v1.4.2
h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4=
+github.com/sirupsen/logrus v1.4.2/go.mod
h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
github.com/spaolacci/murmur3 v1.1.0
h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI=
github.com/spaolacci/murmur3 v1.1.0/go.mod
h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/spf13/cobra v0.0.3 h1:ZlrZ4XsMRm04Fr5pSFxBgfND2EBVa1nLpiy1stUsX/8=
@@ -58,24 +118,36 @@ github.com/stretchr/testify v1.4.0
h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJy
github.com/stretchr/testify v1.4.0/go.mod
h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/yahoo/athenz v1.8.55 h1:xGhxN3yLq334APyn0Zvcc+aqu78Q7BBhYJevM3EtTW0=
github.com/yahoo/athenz v1.8.55/go.mod
h1:G7LLFUH7Z/r4QAB7FfudfuA7Am/eCzO1GlzBhDL6Kv0=
+golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod
h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod
h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod
h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
+golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod
h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod
h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
+golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod
h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod
h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190724013045-ca1201d0de80/go.mod
h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
+golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod
h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod
h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod
h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod
h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33
h1:I6FyU15t786LL7oL/hn43zqTuEGr4PN7F4XJ1p4E3Y8=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod
h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod
h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod
h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod
h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod
h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190804053845-51ab0e2deafa
h1:KIDDMLT1O0Nr7TSxp8xM5tJcdn8tgyAONntO829og1M=
golang.org/x/sys v0.0.0-20190804053845-51ab0e2deafa/go.mod
h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20200106162015-b016eb3dc98e/go.mod
h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1
h1:ogLJMz+qpzav7lGMh10LMvAkM/fAoGlaiiHYiFYdm80=
+golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1/go.mod
h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod
h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20181030221726-6c7e314b6563/go.mod
h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190808195139-e713427fea3f/go.mod
h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod
h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543
h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod
h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod
h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod
h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
@@ -84,11 +156,15 @@ google.golang.org/protobuf
v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miE
google.golang.org/protobuf v1.21.0/go.mod
h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo=
google.golang.org/protobuf v1.23.0
h1:4MY060fB1DLGMB/7MBTLnwQUY6+F09GEiz6SsrNqyzM=
google.golang.org/protobuf v1.23.0/go.mod
h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
+gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod
h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod
h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15
h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod
h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod
h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k=
gopkg.in/square/go-jose.v2 v2.4.1/go.mod
h1:M9dMgbHiYLoDGQrXy7OpJDJWiKiU//h+vD76mk0e1AI=
+gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
+gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
+gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.7 h1:VUgggvou5XRW9mHwD/yXxIYSMtY0zoKQf/v226p2nyo=
gopkg.in/yaml.v2 v2.2.7/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
diff --git a/perf/pulsar-perf-go.go b/perf/pulsar-perf-go.go
index 3cd1d52..8fc0e09 100644
--- a/perf/pulsar-perf-go.go
+++ b/perf/pulsar-perf-go.go
@@ -24,17 +24,20 @@ import (
_ "net/http/pprof"
"os"
"os/signal"
+ "strconv"
"github.com/spf13/cobra"
log "github.com/sirupsen/logrus"
"github.com/apache/pulsar-client-go/pulsar"
+ "github.com/prometheus/client_golang/prometheus/promhttp"
)
// FlagProfile is a global flag
var FlagProfile bool
var flagDebug bool
+var PrometheusPort int
type ClientArgs struct {
ServiceURL string
@@ -71,6 +74,7 @@ func main() {
flags := rootCmd.PersistentFlags()
flags.BoolVar(&FlagProfile, "profile", false, "enable profiling")
+ flags.IntVar(&PrometheusPort, "metrics", 8000, "Port to use to export
metrics for Prometheus. Use -1 to disable.")
flags.BoolVar(&flagDebug, "debug", false, "enable debug output")
flags.StringVarP(&clientArgs.ServiceURL, "service-url", "u",
"pulsar://localhost:6650", "The Pulsar service URL")
@@ -78,6 +82,14 @@ func main() {
rootCmd.AddCommand(newProducerCommand())
rootCmd.AddCommand(newConsumerCommand())
+ if PrometheusPort > 0 {
+ go func() {
+ log.Info("Starting Prometheus metrics at
http://localhost:", PrometheusPort, "/metrics")
+ http.Handle("/metrics", promhttp.Handler())
+ http.ListenAndServe(":"+strconv.Itoa(PrometheusPort),
nil)
+ }()
+ }
+
err := rootCmd.Execute()
if err != nil {
fmt.Fprintf(os.Stderr, "executing command error=%+v\n", err)
diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index feebcf2..e3db670 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -25,12 +25,32 @@ import (
"sync"
"time"
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/prometheus/client_golang/prometheus/promauto"
+
log "github.com/sirupsen/logrus"
"github.com/apache/pulsar-client-go/pulsar/internal"
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
)
+var (
+ consumersOpened = promauto.NewCounter(prometheus.CounterOpts{
+ Name: "pulsar_client_consumers_opened",
+ Help: "Counter of consumers created by the client",
+ })
+
+ consumersClosed = promauto.NewCounter(prometheus.CounterOpts{
+ Name: "pulsar_client_consumers_closed",
+ Help: "Counter of consumers closed by the client",
+ })
+
+ consumersPartitions = promauto.NewGauge(prometheus.GaugeOpts{
+ Name: "pulsar_client_consumers_partitions_active",
+ Help: "Counter of individual partitions the consumers are
currently active",
+ })
+)
+
var ErrConsumerClosed = errors.New("consumer closed")
const defaultNackRedeliveryDelay = 1 * time.Minute
@@ -276,12 +296,17 @@ func (c *consumer) internalTopicSubscribeToPartitions()
error {
return err
}
+ consumersPartitions.Add(float64(partitionsToAdd))
return nil
}
func topicSubscribe(client *client, options ConsumerOptions, topic string,
messageCh chan ConsumerMessage, dlqRouter *dlqRouter) (Consumer, error)
{
- return newInternalConsumer(client, options, topic, messageCh,
dlqRouter, false)
+ c, err := newInternalConsumer(client, options, topic, messageCh,
dlqRouter, false)
+ if err == nil {
+ consumersOpened.Inc()
+ }
+ return c, err
}
func (c *consumer) Subscription() string {
@@ -381,6 +406,8 @@ func (c *consumer) Close() {
c.ticker.Stop()
c.client.handlers.Del(c)
c.dlq.close()
+ consumersClosed.Inc()
+ consumersPartitions.Sub(float64(len(c.consumers)))
})
}
diff --git a/pulsar/consumer_multitopic.go b/pulsar/consumer_multitopic.go
index ec4d57a..f526487 100644
--- a/pulsar/consumer_multitopic.go
+++ b/pulsar/consumer_multitopic.go
@@ -72,6 +72,7 @@ func newMultiTopicConsumer(client *client, options
ConsumerOptions, topics []str
return nil, errs
}
+ consumersOpened.Inc()
return mtc, nil
}
@@ -165,6 +166,7 @@ func (c *multiTopicConsumer) Close() {
wg.Wait()
close(c.closeCh)
c.dlq.close()
+ consumersClosed.Inc()
})
}
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index e539e51..acf897d 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -23,6 +23,9 @@ import (
"sync"
"time"
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/prometheus/client_golang/prometheus/promauto"
+
"github.com/gogo/protobuf/proto"
log "github.com/sirupsen/logrus"
@@ -32,6 +35,49 @@ import (
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
)
+var (
+ messagesReceived = promauto.NewCounter(prometheus.CounterOpts{
+ Name: "pulsar_client_messages_received",
+ Help: "Counter of messages received by the client",
+ })
+
+ bytesReceived = promauto.NewCounter(prometheus.CounterOpts{
+ Name: "pulsar_client_bytes_received",
+ Help: "Counter of bytes received by the client",
+ })
+
+ prefetchedMessages = promauto.NewGauge(prometheus.GaugeOpts{
+ Name: "pulsar_client_consumer_prefetched_messages",
+ Help: "Number of messages currently sitting in the consumer
pre-fetch queue",
+ })
+
+ prefetchedBytes = promauto.NewGauge(prometheus.GaugeOpts{
+ Name: "pulsar_client_consumer_prefetched_bytes",
+ Help: "Total number of bytes currently sitting in the consumer
pre-fetch queue",
+ })
+
+ acksCounter = promauto.NewCounter(prometheus.CounterOpts{
+ Name: "pulsar_client_consumer_acks",
+ Help: "Counter of messages acked by client",
+ })
+
+ nacksCounter = promauto.NewCounter(prometheus.CounterOpts{
+ Name: "pulsar_client_consumer_nacks",
+ Help: "Counter of messages nacked by client",
+ })
+
+ dlqCounter = promauto.NewCounter(prometheus.CounterOpts{
+ Name: "pulsar_client_consumer_dlq_messages",
+ Help: "Counter of messages sent to Dead letter queue",
+ })
+
+ processingTime = promauto.NewHistogram(prometheus.HistogramOpts{
+ Name: "pulsar_client_consumer_processing_time_seconds",
+ Help: "Time it takes for application to process messages",
+ Buckets: []float64{.0005, .001, .005, .01, .025, .05, .1, .25,
.5, 1, 2.5, 5, 10},
+ })
+)
+
type consumerState int
const (
@@ -222,6 +268,8 @@ func (pc *partitionConsumer) internalGetLastMessageID(req
*getLastMsgIDRequest)
func (pc *partitionConsumer) AckID(msgID messageID) {
if !msgID.IsZero() && msgID.ack() {
+ acksCounter.Inc()
+
processingTime.Observe(float64(time.Now().UnixNano()-msgID.receivedTime.UnixNano())
/ 1.0e9)
req := &ackRequest{
msgID: msgID,
}
@@ -231,6 +279,7 @@ func (pc *partitionConsumer) AckID(msgID messageID) {
func (pc *partitionConsumer) NackID(msgID messageID) {
pc.nackTracker.Add(msgID)
+ nacksCounter.Inc()
}
func (pc *partitionConsumer) Redeliver(msgIds []messageID) {
@@ -390,6 +439,10 @@ func (pc *partitionConsumer) MessageReceived(response
*pb.CommandMessage, header
if numMsgs > 1 {
ackTracker = newAckTracker(numMsgs)
}
+
+ messagesReceived.Add(float64(numMsgs))
+ prefetchedMessages.Add(float64(numMsgs))
+
for i := 0; i < numMsgs; i++ {
smm, payload, err := reader.ReadMessage()
if err != nil {
@@ -397,6 +450,9 @@ func (pc *partitionConsumer) MessageReceived(response
*pb.CommandMessage, header
return err
}
+ bytesReceived.Add(float64(len(payload)))
+ prefetchedBytes.Add(float64(len(payload)))
+
msgID := newTrackingMessageID(
int64(pbMsgID.GetLedgerId()),
int64(pbMsgID.GetEntryId()),
@@ -507,11 +563,15 @@ func (pc *partitionConsumer) dispatcher() {
if pc.dlq.shouldSendToDlq(&nextMessage) {
// pass the message to the DLQ router
+ dlqCounter.Inc()
messageCh = pc.dlq.Chan()
} else {
// pass the message to application channel
messageCh = pc.messageCh
}
+
+ prefetchedMessages.Dec()
+ prefetchedBytes.Sub(float64(len(messages[0].payLoad)))
} else {
// we are ready for more messages
queueCh = pc.queueCh
diff --git a/pulsar/consumer_regex.go b/pulsar/consumer_regex.go
index ff7cbca..a6dfd56 100644
--- a/pulsar/consumer_regex.go
+++ b/pulsar/consumer_regex.go
@@ -114,6 +114,7 @@ func newRegexConsumer(c *client, opts ConsumerOptions, tn
*internal.TopicName, p
go rc.monitor()
+ consumersOpened.Inc()
return rc, nil
}
@@ -214,6 +215,7 @@ func (c *regexConsumer) Close() {
}
wg.Wait()
c.dlq.close()
+ consumersClosed.Inc()
})
}
diff --git a/pulsar/impl_message.go b/pulsar/impl_message.go
index f1a9a7c..562dfb6 100644
--- a/pulsar/impl_message.go
+++ b/pulsar/impl_message.go
@@ -34,8 +34,9 @@ type messageID struct {
batchIdx int32
partitionIdx int32
- tracker *ackTracker
- consumer acker
+ tracker *ackTracker
+ consumer acker
+ receivedTime time.Time
}
func (id messageID) IsZero() bool {
@@ -130,6 +131,7 @@ func newTrackingMessageID(ledgerID int64, entryID int64,
batchIdx int32, partiti
batchIdx: batchIdx,
partitionIdx: partitionIdx,
tracker: tracker,
+ receivedTime: time.Now(),
}
}
diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index 06d1543..cc770bb 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -29,6 +29,9 @@ import (
"sync/atomic"
"time"
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/prometheus/client_golang/prometheus/promauto"
+
"github.com/gogo/protobuf/proto"
log "github.com/sirupsen/logrus"
@@ -44,6 +47,28 @@ const (
PulsarProtocolVersion = int32(pb.ProtocolVersion_v13)
)
+var (
+ connectionsOpened = promauto.NewCounter(prometheus.CounterOpts{
+ Name: "pulsar_client_connections_opened",
+ Help: "Counter of connections created by the client",
+ })
+
+ connectionsClosed = promauto.NewCounter(prometheus.CounterOpts{
+ Name: "pulsar_client_connections_closed",
+ Help: "Counter of connections closed by the client",
+ })
+
+ connectionsEstablishmentErrors =
promauto.NewCounter(prometheus.CounterOpts{
+ Name: "pulsar_client_connections_establishment_errors",
+ Help: "Counter of errors in connections establishment",
+ })
+
+ connectionsHandshakeErrors = promauto.NewCounter(prometheus.CounterOpts{
+ Name: "pulsar_client_connections_handshake_errors",
+ Help: "Counter of errors in connections handshake (eg: authz)",
+ })
+)
+
type TLSOptions struct {
TrustCertsFilePath string
AllowInsecureConnection bool
@@ -201,11 +226,14 @@ func (c *connection) start() {
go func() {
if c.connect() {
if c.doHandshake() {
+ connectionsOpened.Inc()
c.run()
} else {
+ connectionsHandshakeErrors.Inc()
c.changeState(connectionClosed)
}
} else {
+ connectionsEstablishmentErrors.Inc()
c.changeState(connectionClosed)
}
}()
@@ -695,6 +723,8 @@ func (c *connection) Close() {
for _, handler := range consumerHandlers {
handler.ConnectionClosed()
}
+
+ connectionsClosed.Inc()
}
func (c *connection) changeState(state connectionState) {
diff --git a/pulsar/internal/lookup_service.go
b/pulsar/internal/lookup_service.go
index 9537c57..1673f4d 100644
--- a/pulsar/internal/lookup_service.go
+++ b/pulsar/internal/lookup_service.go
@@ -22,12 +22,22 @@ import (
"fmt"
"net/url"
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/prometheus/client_golang/prometheus/promauto"
+
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
"github.com/gogo/protobuf/proto"
log "github.com/sirupsen/logrus"
)
+var (
+ lookupRequestsCount = promauto.NewCounter(prometheus.CounterOpts{
+ Name: "pulsar_client_lookup_count",
+ Help: "Counter of lookup requests made by the client",
+ })
+)
+
// LookupResult encapsulates a struct for lookup a request, containing two
parts: LogicalAddr, PhysicalAddr.
type LookupResult struct {
LogicalAddr *url.URL
@@ -82,6 +92,7 @@ func (ls *lookupService) getBrokerAddress(lr
*pb.CommandLookupTopicResponse) (lo
const lookupResultMaxRedirect = 20
func (ls *lookupService) Lookup(topic string) (*LookupResult, error) {
+ lookupRequestsCount.Inc()
id := ls.rpcClient.NewRequestID()
res, err := ls.rpcClient.RequestToAnyBroker(id, pb.BaseCommand_LOOKUP,
&pb.CommandLookupTopic{
RequestId: &id,
diff --git a/pulsar/internal/rpc_client.go b/pulsar/internal/rpc_client.go
index ad896ed..0d4cc12 100644
--- a/pulsar/internal/rpc_client.go
+++ b/pulsar/internal/rpc_client.go
@@ -24,12 +24,22 @@ import (
"sync/atomic"
"time"
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/prometheus/client_golang/prometheus/promauto"
+
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
"github.com/gogo/protobuf/proto"
log "github.com/sirupsen/logrus"
)
+var (
+ rpcRequestCount = promauto.NewCounter(prometheus.CounterOpts{
+ Name: "pulsar_client_rpc_count",
+ Help: "Counter of RPC requests made by the client",
+ })
+)
+
type RPCResult struct {
Response *pb.BaseCommand
Cnx Connection
@@ -81,6 +91,7 @@ func (c *rpcClient) RequestToAnyBroker(requestID uint64,
cmdType pb.BaseCommand_
func (c *rpcClient) Request(logicalAddr *url.URL, physicalAddr *url.URL,
requestID uint64,
cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error)
{
+ rpcRequestCount.Inc()
cnx, err := c.getConn(logicalAddr, physicalAddr)
if err != nil {
return nil, err
@@ -132,6 +143,7 @@ func (c *rpcClient) getConn(logicalAddr *url.URL,
physicalAddr *url.URL) (Connec
func (c *rpcClient) RequestOnCnx(cnx Connection, requestID uint64, cmdType
pb.BaseCommand_Type,
message proto.Message) (*RPCResult, error) {
+ rpcRequestCount.Inc()
wg := sync.WaitGroup{}
wg.Add(1)
@@ -151,6 +163,7 @@ func (c *rpcClient) RequestOnCnx(cnx Connection, requestID
uint64, cmdType pb.Ba
}
func (c *rpcClient) RequestOnCnxNoWait(cnx Connection, cmdType
pb.BaseCommand_Type, message proto.Message) {
+ rpcRequestCount.Inc()
cnx.SendRequestNoWait(baseCommand(cmdType, message))
}
diff --git a/pulsar/producer_impl.go b/pulsar/producer_impl.go
index 35dae28..01c5d76 100644
--- a/pulsar/producer_impl.go
+++ b/pulsar/producer_impl.go
@@ -24,11 +24,31 @@ import (
"time"
"unsafe"
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/prometheus/client_golang/prometheus/promauto"
+
log "github.com/sirupsen/logrus"
"github.com/apache/pulsar-client-go/pulsar/internal"
)
+var (
+ producersOpened = promauto.NewCounter(prometheus.CounterOpts{
+ Name: "pulsar_client_producers_opened",
+ Help: "Counter of producers created by the client",
+ })
+
+ producersClosed = promauto.NewCounter(prometheus.CounterOpts{
+ Name: "pulsar_client_producers_closed",
+ Help: "Counter of producers closed by the client",
+ })
+
+ producersPartitions = promauto.NewGauge(prometheus.GaugeOpts{
+ Name: "pulsar_client_producers_partitions_active",
+ Help: "Counter of individual partitions the producers are
currently active",
+ })
+)
+
type producer struct {
sync.RWMutex
client *client
@@ -103,6 +123,7 @@ func newProducer(client *client, options *ProducerOptions)
(*producer, error) {
}
}()
+ producersOpened.Inc()
return p, nil
}
@@ -182,6 +203,7 @@ func (p *producer) internalCreatePartitionsProducers()
error {
return err
}
+ producersPartitions.Add(float64(partitionsToAdd))
atomic.StorePointer(&p.producersPtr, unsafe.Pointer(&p.producers))
atomic.StoreUint32(&p.numPartitions, uint32(len(p.producers)))
return nil
@@ -262,4 +284,6 @@ func (p *producer) Close() {
pp.Close()
}
p.client.handlers.Del(p)
+ producersPartitions.Sub(float64(len(p.producers)))
+ producersClosed.Inc()
}
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 9243804..b8dc13d 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -24,6 +24,9 @@ import (
"sync/atomic"
"time"
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/prometheus/client_golang/prometheus/promauto"
+
"github.com/apache/pulsar-client-go/pulsar/internal/compression"
"github.com/gogo/protobuf/proto"
@@ -49,6 +52,39 @@ var (
buffersPool sync.Pool
)
+var (
+ messagesPublished = promauto.NewCounter(prometheus.CounterOpts{
+ Name: "pulsar_client_messages_published",
+ Help: "Counter of messages published by the client",
+ })
+
+ bytesPublished = promauto.NewCounter(prometheus.CounterOpts{
+ Name: "pulsar_client_bytes_published",
+ Help: "Counter of messages published by the client",
+ })
+
+ messagesPending = promauto.NewGauge(prometheus.GaugeOpts{
+ Name: "pulsar_client_producer_pending_messages",
+ Help: "Counter of messages pending to be published by the
client",
+ })
+
+ bytesPending = promauto.NewGauge(prometheus.GaugeOpts{
+ Name: "pulsar_client_producer_pending_bytes",
+ Help: "Counter of bytes pending to be published by the client",
+ })
+
+ publishErrors = promauto.NewCounter(prometheus.CounterOpts{
+ Name: "pulsar_client_producer_errors",
+ Help: "Counter of publish errors",
+ })
+
+ publishLatency = promauto.NewHistogram(prometheus.HistogramOpts{
+ Name: "pulsar_client_producer_latency_seconds",
+ Help: "Publish latency experienced by the client",
+ Buckets: []float64{.0005, .001, .005, .01, .025, .05, .1, .25,
.5, 1, 2.5, 5, 10},
+ })
+)
+
type partitionProducer struct {
state int32
client *client
@@ -261,6 +297,7 @@ func (p *partitionProducer) internalSend(request
*sendRequest) {
p.log.WithField("size", len(msg.Payload)).
WithField("properties", msg.Properties).
WithError(errMessageTooLarge).Error()
+ publishErrors.Inc()
return
}
@@ -399,13 +436,19 @@ func (p *partitionProducer) SendAsync(ctx
context.Context, msg *ProducerMessage,
func (p *partitionProducer) internalSendAsync(ctx context.Context, msg
*ProducerMessage,
callback func(MessageID, *ProducerMessage, error), flushImmediately
bool) {
- p.publishSemaphore.Acquire()
+
sr := &sendRequest{
ctx: ctx,
msg: msg,
callback: callback,
flushImmediately: flushImmediately,
+ publishTime: time.Now(),
}
+
+ messagesPending.Inc()
+ bytesPending.Add(float64(len(sr.msg.Payload)))
+
+ p.publishSemaphore.Acquire()
p.eventsChan <- sr
}
@@ -426,6 +469,8 @@ func (p *partitionProducer) ReceivedSendReceipt(response
*pb.CommandSendReceipt)
// The ack was indeed for the expected item in the queue, we can remove
it and trigger the callback
p.pendingQueue.Poll()
+ now := time.Now().UnixNano()
+
// lock the pending item while sending the requests
pi.Lock()
defer pi.Unlock()
@@ -434,6 +479,13 @@ func (p *partitionProducer) ReceivedSendReceipt(response
*pb.CommandSendReceipt)
if sr.msg != nil {
atomic.StoreInt64(&p.lastSequenceID,
int64(pi.sequenceID))
p.publishSemaphore.Release()
+
+
publishLatency.Observe(float64(now-sr.publishTime.UnixNano()) / 1.0e9)
+ messagesPublished.Inc()
+ messagesPending.Dec()
+ payloadSize := float64(len(sr.msg.Payload))
+ bytesPublished.Add(payloadSize)
+ bytesPending.Sub(payloadSize)
}
if sr.callback != nil {
@@ -516,6 +568,7 @@ type sendRequest struct {
ctx context.Context
msg *ProducerMessage
callback func(MessageID, *ProducerMessage, error)
+ publishTime time.Time
flushImmediately bool
}
diff --git a/pulsar/reader_impl.go b/pulsar/reader_impl.go
index b74b35b..d97cc96 100644
--- a/pulsar/reader_impl.go
+++ b/pulsar/reader_impl.go
@@ -20,6 +20,9 @@ package pulsar
import (
"context"
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/prometheus/client_golang/prometheus/promauto"
+
log "github.com/sirupsen/logrus"
)
@@ -27,6 +30,18 @@ const (
defaultReceiverQueueSize = 1000
)
+var (
+ readersOpened = promauto.NewCounter(prometheus.CounterOpts{
+ Name: "pulsar_client_readers_opened",
+ Help: "Counter of readers created by the client",
+ })
+
+ readersClosed = promauto.NewCounter(prometheus.CounterOpts{
+ Name: "pulsar_client_readers_closed",
+ Help: "Counter of readers closed by the client",
+ })
+)
+
type reader struct {
pc *partitionConsumer
messageCh chan ConsumerMessage
@@ -101,6 +116,7 @@ func newReader(client *client, options ReaderOptions)
(Reader, error) {
}
reader.pc = pc
+ readersOpened.Inc()
return reader, nil
}
@@ -162,4 +178,5 @@ func (r *reader) hasMoreMessages() bool {
func (r *reader) Close() {
r.pc.Close()
+ readersClosed.Inc()
}