This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 1a3ad70  Fix missing metrics for topics by registration of existing 
collector (#600)
1a3ad70 is described below

commit 1a3ad70403aa04feda83edcfd984e25cef2dcfe6
Author: pkutilina <[email protected]>
AuthorDate: Wed Aug 25 20:18:18 2021 +0200

    Fix missing metrics for topics by registration of existing collector (#600)
---
 pulsar/internal/metrics.go | 213 ++++++++++++++++++++++++++++++++++++++-------
 1 file changed, 180 insertions(+), 33 deletions(-)

diff --git a/pulsar/internal/metrics.go b/pulsar/internal/metrics.go
index d44522d..6902ca9 100644
--- a/pulsar/internal/metrics.go
+++ b/pulsar/internal/metrics.go
@@ -282,39 +282,186 @@ func NewMetricsProvider(userDefinedLabels 
map[string]string) *Metrics {
                }),
        }
 
-       prometheus.DefaultRegisterer.Register(metrics.messagesPublished)
-       prometheus.DefaultRegisterer.Register(metrics.bytesPublished)
-       prometheus.DefaultRegisterer.Register(metrics.messagesPending)
-       prometheus.DefaultRegisterer.Register(metrics.bytesPending)
-       prometheus.DefaultRegisterer.Register(metrics.publishErrors)
-       prometheus.DefaultRegisterer.Register(metrics.publishLatency)
-       prometheus.DefaultRegisterer.Register(metrics.publishRPCLatency)
-
-       prometheus.DefaultRegisterer.Register(metrics.messagesReceived)
-       prometheus.DefaultRegisterer.Register(metrics.bytesReceived)
-       prometheus.DefaultRegisterer.Register(metrics.prefetchedMessages)
-       prometheus.DefaultRegisterer.Register(metrics.prefetchedBytes)
-       prometheus.DefaultRegisterer.Register(metrics.acksCounter)
-       prometheus.DefaultRegisterer.Register(metrics.nacksCounter)
-       prometheus.DefaultRegisterer.Register(metrics.dlqCounter)
-       prometheus.DefaultRegisterer.Register(metrics.processingTime)
-
-       prometheus.DefaultRegisterer.Register(metrics.producersOpened)
-       prometheus.DefaultRegisterer.Register(metrics.producersClosed)
-       prometheus.DefaultRegisterer.Register(metrics.producersPartitions)
-       prometheus.DefaultRegisterer.Register(metrics.consumersOpened)
-       prometheus.DefaultRegisterer.Register(metrics.consumersClosed)
-       prometheus.DefaultRegisterer.Register(metrics.consumersPartitions)
-       prometheus.DefaultRegisterer.Register(metrics.readersOpened)
-       prometheus.DefaultRegisterer.Register(metrics.readersClosed)
-
-       prometheus.DefaultRegisterer.Register(metrics.ConnectionsOpened)
-       prometheus.DefaultRegisterer.Register(metrics.ConnectionsClosed)
-       
prometheus.DefaultRegisterer.Register(metrics.ConnectionsEstablishmentErrors)
-       
prometheus.DefaultRegisterer.Register(metrics.ConnectionsHandshakeErrors)
-       prometheus.DefaultRegisterer.Register(metrics.LookupRequestsCount)
-       
prometheus.DefaultRegisterer.Register(metrics.PartitionedTopicMetadataRequestsCount)
-       prometheus.DefaultRegisterer.Register(metrics.RPCRequestCount)
+       err := prometheus.DefaultRegisterer.Register(metrics.messagesPublished)
+       if err != nil {
+               if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+                       metrics.messagesPublished = 
are.ExistingCollector.(*prometheus.CounterVec)
+               }
+       }
+       err = prometheus.DefaultRegisterer.Register(metrics.bytesPublished)
+       if err != nil {
+               if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+                       metrics.bytesPublished = 
are.ExistingCollector.(*prometheus.CounterVec)
+               }
+       }
+       err = prometheus.DefaultRegisterer.Register(metrics.messagesPending)
+       if err != nil {
+               if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+                       metrics.messagesPending = 
are.ExistingCollector.(*prometheus.GaugeVec)
+               }
+       }
+       err = prometheus.DefaultRegisterer.Register(metrics.bytesPending)
+       if err != nil {
+               if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+                       metrics.bytesPending = 
are.ExistingCollector.(*prometheus.GaugeVec)
+               }
+       }
+       err = prometheus.DefaultRegisterer.Register(metrics.publishErrors)
+       if err != nil {
+               if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+                       metrics.publishErrors = 
are.ExistingCollector.(*prometheus.CounterVec)
+               }
+       }
+       err = prometheus.DefaultRegisterer.Register(metrics.publishLatency)
+       if err != nil {
+               if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+                       metrics.publishLatency = 
are.ExistingCollector.(*prometheus.HistogramVec)
+               }
+       }
+       err = prometheus.DefaultRegisterer.Register(metrics.publishRPCLatency)
+       if err != nil {
+               if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+                       metrics.publishRPCLatency = 
are.ExistingCollector.(*prometheus.HistogramVec)
+               }
+       }
+       err = prometheus.DefaultRegisterer.Register(metrics.messagesReceived)
+       if err != nil {
+               if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+                       metrics.messagesReceived = 
are.ExistingCollector.(*prometheus.CounterVec)
+               }
+       }
+       err = prometheus.DefaultRegisterer.Register(metrics.bytesReceived)
+       if err != nil {
+               if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+                       metrics.bytesReceived = 
are.ExistingCollector.(*prometheus.CounterVec)
+               }
+       }
+       err = prometheus.DefaultRegisterer.Register(metrics.prefetchedMessages)
+       if err != nil {
+               if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+                       metrics.prefetchedMessages = 
are.ExistingCollector.(*prometheus.GaugeVec)
+               }
+       }
+       err = prometheus.DefaultRegisterer.Register(metrics.prefetchedBytes)
+       if err != nil {
+               if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+                       metrics.prefetchedBytes = 
are.ExistingCollector.(*prometheus.GaugeVec)
+               }
+       }
+       err = prometheus.DefaultRegisterer.Register(metrics.acksCounter)
+       if err != nil {
+               if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+                       metrics.acksCounter = 
are.ExistingCollector.(*prometheus.CounterVec)
+               }
+       }
+       err = prometheus.DefaultRegisterer.Register(metrics.nacksCounter)
+       if err != nil {
+               if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+                       metrics.nacksCounter = 
are.ExistingCollector.(*prometheus.CounterVec)
+               }
+       }
+       err = prometheus.DefaultRegisterer.Register(metrics.dlqCounter)
+       if err != nil {
+               if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+                       metrics.dlqCounter = 
are.ExistingCollector.(*prometheus.CounterVec)
+               }
+       }
+       err = prometheus.DefaultRegisterer.Register(metrics.processingTime)
+       if err != nil {
+               if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+                       metrics.processingTime = 
are.ExistingCollector.(*prometheus.HistogramVec)
+               }
+       }
+       err = prometheus.DefaultRegisterer.Register(metrics.producersOpened)
+       if err != nil {
+               if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+                       metrics.producersOpened = 
are.ExistingCollector.(*prometheus.CounterVec)
+               }
+       }
+       err = prometheus.DefaultRegisterer.Register(metrics.producersClosed)
+       if err != nil {
+               if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+                       metrics.producersClosed = 
are.ExistingCollector.(*prometheus.CounterVec)
+               }
+       }
+       err = prometheus.DefaultRegisterer.Register(metrics.producersPartitions)
+       if err != nil {
+               if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+                       metrics.producersPartitions = 
are.ExistingCollector.(*prometheus.GaugeVec)
+               }
+       }
+       err = prometheus.DefaultRegisterer.Register(metrics.consumersOpened)
+       if err != nil {
+               if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+                       metrics.consumersOpened = 
are.ExistingCollector.(*prometheus.CounterVec)
+               }
+       }
+       err = prometheus.DefaultRegisterer.Register(metrics.consumersClosed)
+       if err != nil {
+               if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+                       metrics.consumersClosed = 
are.ExistingCollector.(*prometheus.CounterVec)
+               }
+       }
+       err = prometheus.DefaultRegisterer.Register(metrics.consumersPartitions)
+       if err != nil {
+               if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+                       metrics.consumersPartitions = 
are.ExistingCollector.(*prometheus.GaugeVec)
+               }
+       }
+       err = prometheus.DefaultRegisterer.Register(metrics.readersOpened)
+       if err != nil {
+               if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+                       metrics.readersOpened = 
are.ExistingCollector.(*prometheus.CounterVec)
+               }
+       }
+       err = prometheus.DefaultRegisterer.Register(metrics.readersClosed)
+       if err != nil {
+               if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+                       metrics.readersClosed = 
are.ExistingCollector.(*prometheus.CounterVec)
+               }
+       }
+       err = prometheus.DefaultRegisterer.Register(metrics.ConnectionsOpened)
+       if err != nil {
+               if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+                       metrics.ConnectionsOpened = 
are.ExistingCollector.(prometheus.Counter)
+               }
+       }
+       err = prometheus.DefaultRegisterer.Register(metrics.ConnectionsClosed)
+       if err != nil {
+               if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+                       metrics.ConnectionsClosed = 
are.ExistingCollector.(prometheus.Counter)
+               }
+       }
+       err = 
prometheus.DefaultRegisterer.Register(metrics.ConnectionsEstablishmentErrors)
+       if err != nil {
+               if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+                       metrics.ConnectionsEstablishmentErrors = 
are.ExistingCollector.(prometheus.Counter)
+               }
+       }
+       err = 
prometheus.DefaultRegisterer.Register(metrics.ConnectionsHandshakeErrors)
+       if err != nil {
+               if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+                       metrics.ConnectionsHandshakeErrors = 
are.ExistingCollector.(prometheus.Counter)
+               }
+       }
+       err = prometheus.DefaultRegisterer.Register(metrics.LookupRequestsCount)
+       if err != nil {
+               if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+                       metrics.LookupRequestsCount = 
are.ExistingCollector.(prometheus.Counter)
+               }
+       }
+       err = 
prometheus.DefaultRegisterer.Register(metrics.PartitionedTopicMetadataRequestsCount)
+       if err != nil {
+               if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+                       metrics.PartitionedTopicMetadataRequestsCount = 
are.ExistingCollector.(prometheus.Counter)
+               }
+       }
+       err = prometheus.DefaultRegisterer.Register(metrics.RPCRequestCount)
+       if err != nil {
+               if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+                       metrics.RPCRequestCount = 
are.ExistingCollector.(prometheus.Counter)
+               }
+       }
        return metrics
 }
 

Reply via email to