This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 1a3ad70 Fix missing metrics for topics by registration of existing
collector (#600)
1a3ad70 is described below
commit 1a3ad70403aa04feda83edcfd984e25cef2dcfe6
Author: pkutilina <[email protected]>
AuthorDate: Wed Aug 25 20:18:18 2021 +0200
Fix missing metrics for topics by registration of existing collector (#600)
---
pulsar/internal/metrics.go | 213 ++++++++++++++++++++++++++++++++++++++-------
1 file changed, 180 insertions(+), 33 deletions(-)
diff --git a/pulsar/internal/metrics.go b/pulsar/internal/metrics.go
index d44522d..6902ca9 100644
--- a/pulsar/internal/metrics.go
+++ b/pulsar/internal/metrics.go
@@ -282,39 +282,186 @@ func NewMetricsProvider(userDefinedLabels
map[string]string) *Metrics {
}),
}
- prometheus.DefaultRegisterer.Register(metrics.messagesPublished)
- prometheus.DefaultRegisterer.Register(metrics.bytesPublished)
- prometheus.DefaultRegisterer.Register(metrics.messagesPending)
- prometheus.DefaultRegisterer.Register(metrics.bytesPending)
- prometheus.DefaultRegisterer.Register(metrics.publishErrors)
- prometheus.DefaultRegisterer.Register(metrics.publishLatency)
- prometheus.DefaultRegisterer.Register(metrics.publishRPCLatency)
-
- prometheus.DefaultRegisterer.Register(metrics.messagesReceived)
- prometheus.DefaultRegisterer.Register(metrics.bytesReceived)
- prometheus.DefaultRegisterer.Register(metrics.prefetchedMessages)
- prometheus.DefaultRegisterer.Register(metrics.prefetchedBytes)
- prometheus.DefaultRegisterer.Register(metrics.acksCounter)
- prometheus.DefaultRegisterer.Register(metrics.nacksCounter)
- prometheus.DefaultRegisterer.Register(metrics.dlqCounter)
- prometheus.DefaultRegisterer.Register(metrics.processingTime)
-
- prometheus.DefaultRegisterer.Register(metrics.producersOpened)
- prometheus.DefaultRegisterer.Register(metrics.producersClosed)
- prometheus.DefaultRegisterer.Register(metrics.producersPartitions)
- prometheus.DefaultRegisterer.Register(metrics.consumersOpened)
- prometheus.DefaultRegisterer.Register(metrics.consumersClosed)
- prometheus.DefaultRegisterer.Register(metrics.consumersPartitions)
- prometheus.DefaultRegisterer.Register(metrics.readersOpened)
- prometheus.DefaultRegisterer.Register(metrics.readersClosed)
-
- prometheus.DefaultRegisterer.Register(metrics.ConnectionsOpened)
- prometheus.DefaultRegisterer.Register(metrics.ConnectionsClosed)
-
prometheus.DefaultRegisterer.Register(metrics.ConnectionsEstablishmentErrors)
-
prometheus.DefaultRegisterer.Register(metrics.ConnectionsHandshakeErrors)
- prometheus.DefaultRegisterer.Register(metrics.LookupRequestsCount)
-
prometheus.DefaultRegisterer.Register(metrics.PartitionedTopicMetadataRequestsCount)
- prometheus.DefaultRegisterer.Register(metrics.RPCRequestCount)
+ err := prometheus.DefaultRegisterer.Register(metrics.messagesPublished)
+ if err != nil {
+ if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+ metrics.messagesPublished =
are.ExistingCollector.(*prometheus.CounterVec)
+ }
+ }
+ err = prometheus.DefaultRegisterer.Register(metrics.bytesPublished)
+ if err != nil {
+ if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+ metrics.bytesPublished =
are.ExistingCollector.(*prometheus.CounterVec)
+ }
+ }
+ err = prometheus.DefaultRegisterer.Register(metrics.messagesPending)
+ if err != nil {
+ if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+ metrics.messagesPending =
are.ExistingCollector.(*prometheus.GaugeVec)
+ }
+ }
+ err = prometheus.DefaultRegisterer.Register(metrics.bytesPending)
+ if err != nil {
+ if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+ metrics.bytesPending =
are.ExistingCollector.(*prometheus.GaugeVec)
+ }
+ }
+ err = prometheus.DefaultRegisterer.Register(metrics.publishErrors)
+ if err != nil {
+ if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+ metrics.publishErrors =
are.ExistingCollector.(*prometheus.CounterVec)
+ }
+ }
+ err = prometheus.DefaultRegisterer.Register(metrics.publishLatency)
+ if err != nil {
+ if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+ metrics.publishLatency =
are.ExistingCollector.(*prometheus.HistogramVec)
+ }
+ }
+ err = prometheus.DefaultRegisterer.Register(metrics.publishRPCLatency)
+ if err != nil {
+ if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+ metrics.publishRPCLatency =
are.ExistingCollector.(*prometheus.HistogramVec)
+ }
+ }
+ err = prometheus.DefaultRegisterer.Register(metrics.messagesReceived)
+ if err != nil {
+ if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+ metrics.messagesReceived =
are.ExistingCollector.(*prometheus.CounterVec)
+ }
+ }
+ err = prometheus.DefaultRegisterer.Register(metrics.bytesReceived)
+ if err != nil {
+ if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+ metrics.bytesReceived =
are.ExistingCollector.(*prometheus.CounterVec)
+ }
+ }
+ err = prometheus.DefaultRegisterer.Register(metrics.prefetchedMessages)
+ if err != nil {
+ if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+ metrics.prefetchedMessages =
are.ExistingCollector.(*prometheus.GaugeVec)
+ }
+ }
+ err = prometheus.DefaultRegisterer.Register(metrics.prefetchedBytes)
+ if err != nil {
+ if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+ metrics.prefetchedBytes =
are.ExistingCollector.(*prometheus.GaugeVec)
+ }
+ }
+ err = prometheus.DefaultRegisterer.Register(metrics.acksCounter)
+ if err != nil {
+ if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+ metrics.acksCounter =
are.ExistingCollector.(*prometheus.CounterVec)
+ }
+ }
+ err = prometheus.DefaultRegisterer.Register(metrics.nacksCounter)
+ if err != nil {
+ if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+ metrics.nacksCounter =
are.ExistingCollector.(*prometheus.CounterVec)
+ }
+ }
+ err = prometheus.DefaultRegisterer.Register(metrics.dlqCounter)
+ if err != nil {
+ if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+ metrics.dlqCounter =
are.ExistingCollector.(*prometheus.CounterVec)
+ }
+ }
+ err = prometheus.DefaultRegisterer.Register(metrics.processingTime)
+ if err != nil {
+ if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+ metrics.processingTime =
are.ExistingCollector.(*prometheus.HistogramVec)
+ }
+ }
+ err = prometheus.DefaultRegisterer.Register(metrics.producersOpened)
+ if err != nil {
+ if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+ metrics.producersOpened =
are.ExistingCollector.(*prometheus.CounterVec)
+ }
+ }
+ err = prometheus.DefaultRegisterer.Register(metrics.producersClosed)
+ if err != nil {
+ if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+ metrics.producersClosed =
are.ExistingCollector.(*prometheus.CounterVec)
+ }
+ }
+ err = prometheus.DefaultRegisterer.Register(metrics.producersPartitions)
+ if err != nil {
+ if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+ metrics.producersPartitions =
are.ExistingCollector.(*prometheus.GaugeVec)
+ }
+ }
+ err = prometheus.DefaultRegisterer.Register(metrics.consumersOpened)
+ if err != nil {
+ if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+ metrics.consumersOpened =
are.ExistingCollector.(*prometheus.CounterVec)
+ }
+ }
+ err = prometheus.DefaultRegisterer.Register(metrics.consumersClosed)
+ if err != nil {
+ if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+ metrics.consumersClosed =
are.ExistingCollector.(*prometheus.CounterVec)
+ }
+ }
+ err = prometheus.DefaultRegisterer.Register(metrics.consumersPartitions)
+ if err != nil {
+ if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+ metrics.consumersPartitions =
are.ExistingCollector.(*prometheus.GaugeVec)
+ }
+ }
+ err = prometheus.DefaultRegisterer.Register(metrics.readersOpened)
+ if err != nil {
+ if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+ metrics.readersOpened =
are.ExistingCollector.(*prometheus.CounterVec)
+ }
+ }
+ err = prometheus.DefaultRegisterer.Register(metrics.readersClosed)
+ if err != nil {
+ if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+ metrics.readersClosed =
are.ExistingCollector.(*prometheus.CounterVec)
+ }
+ }
+ err = prometheus.DefaultRegisterer.Register(metrics.ConnectionsOpened)
+ if err != nil {
+ if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+ metrics.ConnectionsOpened =
are.ExistingCollector.(prometheus.Counter)
+ }
+ }
+ err = prometheus.DefaultRegisterer.Register(metrics.ConnectionsClosed)
+ if err != nil {
+ if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+ metrics.ConnectionsClosed =
are.ExistingCollector.(prometheus.Counter)
+ }
+ }
+ err =
prometheus.DefaultRegisterer.Register(metrics.ConnectionsEstablishmentErrors)
+ if err != nil {
+ if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+ metrics.ConnectionsEstablishmentErrors =
are.ExistingCollector.(prometheus.Counter)
+ }
+ }
+ err =
prometheus.DefaultRegisterer.Register(metrics.ConnectionsHandshakeErrors)
+ if err != nil {
+ if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+ metrics.ConnectionsHandshakeErrors =
are.ExistingCollector.(prometheus.Counter)
+ }
+ }
+ err = prometheus.DefaultRegisterer.Register(metrics.LookupRequestsCount)
+ if err != nil {
+ if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+ metrics.LookupRequestsCount =
are.ExistingCollector.(prometheus.Counter)
+ }
+ }
+ err =
prometheus.DefaultRegisterer.Register(metrics.PartitionedTopicMetadataRequestsCount)
+ if err != nil {
+ if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+ metrics.PartitionedTopicMetadataRequestsCount =
are.ExistingCollector.(prometheus.Counter)
+ }
+ }
+ err = prometheus.DefaultRegisterer.Register(metrics.RPCRequestCount)
+ if err != nil {
+ if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+ metrics.RPCRequestCount =
are.ExistingCollector.(prometheus.Counter)
+ }
+ }
return metrics
}