This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new ce79489  User-defined metric cardinality (#604)
ce79489 is described below

commit ce794898bce8d0799e143b42833167252a3b6cd0
Author: Andy Walker <[email protected]>
AuthorDate: Sat Oct 9 05:03:58 2021 -0400

    User-defined metric cardinality (#604)
    
    * initial commit
    
    * forgot "none"
    
    * Satisfy the lint gods
    
    Co-authored-by: xiaolongran <[email protected]>
---
 go.mod                                 |  2 +
 go.sum                                 |  2 -
 pulsar/client.go                       | 15 ++++++
 pulsar/client_impl.go                  |  8 ++-
 pulsar/consumer_impl.go                |  4 +-
 pulsar/consumer_partition.go           |  4 +-
 pulsar/consumer_partition_test.go      |  6 +--
 pulsar/internal/lookup_service_test.go | 29 ++++++-----
 pulsar/internal/metrics.go             | 95 +++++++++++++++++++++-------------
 pulsar/producer_impl.go                |  4 +-
 pulsar/producer_partition.go           |  4 +-
 pulsar/reader_impl.go                  |  4 +-
 12 files changed, 111 insertions(+), 66 deletions(-)

diff --git a/go.mod b/go.mod
index 354f5b4..af7c571 100644
--- a/go.mod
+++ b/go.mod
@@ -15,6 +15,8 @@ require (
        github.com/inconshreveable/mousetrap v1.0.0 // indirect
        github.com/klauspost/compress v1.10.8
        github.com/linkedin/goavro/v2 v2.9.8
+       github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // 
indirect
+       github.com/modern-go/reflect2 v1.0.1 // indirect
        github.com/opentracing/opentracing-go v1.2.0
        github.com/pierrec/lz4 v2.0.5+incompatible
        github.com/pkg/errors v0.9.1
diff --git a/go.sum b/go.sum
index 8b372c7..85fba89 100644
--- a/go.sum
+++ b/go.sum
@@ -29,7 +29,6 @@ github.com/danieljoos/wincred v1.0.2/go.mod 
h1:SnuYRW9lp1oJrZX/dXJqr0cPK5gYXqx3E
 github.com/davecgh/go-spew v1.1.0/go.mod 
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
 github.com/davecgh/go-spew v1.1.1 
h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
 github.com/davecgh/go-spew v1.1.1/go.mod 
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
-github.com/dgrijalva/jwt-go v3.2.0+incompatible 
h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM=
 github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod 
h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
 github.com/dimfeld/httptreemux v5.0.1+incompatible 
h1:Qj3gVcDNoOthBAqftuD596rm4wg/adLLz5xh5CmpiCA=
 github.com/dimfeld/httptreemux v5.0.1+incompatible/go.mod 
h1:rbUlSV+CCpv/SuqUTP/8Bk2O3LyUV436/yaRGkhP6Z0=
@@ -159,7 +158,6 @@ github.com/stretchr/objx v0.2.0 
h1:Hbg2NidpLE8veEBkEZTL3CvlkUIVzuU9jDplZO54c48=
 github.com/stretchr/objx v0.2.0/go.mod 
h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE=
 github.com/stretchr/testify v1.2.2/go.mod 
h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
 github.com/stretchr/testify v1.3.0/go.mod 
h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
-github.com/stretchr/testify v1.4.0 
h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
 github.com/stretchr/testify v1.4.0/go.mod 
h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
 github.com/stretchr/testify v1.5.1 
h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4=
 github.com/stretchr/testify v1.5.1/go.mod 
h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
diff --git a/pulsar/client.go b/pulsar/client.go
index cc6fb3c..8ff152a 100644
--- a/pulsar/client.go
+++ b/pulsar/client.go
@@ -117,6 +117,10 @@ type ClientOptions struct {
        // FIXME: use `logger` as internal field name instead of `log` as it's 
more idiomatic
        Logger log.Logger
 
+       // Specify metric cardinality to the tenant, namespace or topic levels, 
or remove it completely.
+       // Default: MetricsCardinalityNamespace
+       MetricsCardinality MetricsCardinality
+
        // Add custom labels to all the metrics reported by this client instance
        CustomMetricsLabels map[string]string
 }
@@ -150,3 +154,14 @@ type Client interface {
        // Close Closes the Client and free associated resources
        Close()
 }
+
+// MetricsCardinality represents the specificty of labels on a per-metric basis
+type MetricsCardinality int
+
+const (
+       _                           MetricsCardinality = iota
+       MetricsCardinalityNone                         // Do not add additional 
labels to metrics
+       MetricsCardinalityTenant                       // Label metrics by 
tenant
+       MetricsCardinalityNamespace                    // Label metrics by 
tenant and namespace
+       MetricsCardinalityTopic                        // Label metrics by topic
+)
diff --git a/pulsar/client_impl.go b/pulsar/client_impl.go
index 7d2fcfd..5682927 100644
--- a/pulsar/client_impl.go
+++ b/pulsar/client_impl.go
@@ -107,11 +107,15 @@ func newClient(options ClientOptions) (Client, error) {
                maxConnectionsPerHost = 1
        }
 
+       if options.MetricsCardinality == 0 {
+               options.MetricsCardinality = MetricsCardinalityNamespace
+       }
+
        var metrics *internal.Metrics
        if options.CustomMetricsLabels != nil {
-               metrics = 
internal.NewMetricsProvider(options.CustomMetricsLabels)
+               metrics = 
internal.NewMetricsProvider(int(options.MetricsCardinality), 
options.CustomMetricsLabels)
        } else {
-               metrics = internal.NewMetricsProvider(map[string]string{})
+               metrics = 
internal.NewMetricsProvider(int(options.MetricsCardinality), 
map[string]string{})
        }
 
        c := &client{
diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index 232079b..6aef497 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -57,7 +57,7 @@ type consumer struct {
        stopDiscovery func()
 
        log     log.Logger
-       metrics *internal.TopicMetrics
+       metrics *internal.LeveledMetrics
 }
 
 func newConsumer(client *client, options ConsumerOptions) (Consumer, error) {
@@ -197,7 +197,7 @@ func newInternalConsumer(client *client, options 
ConsumerOptions, topic string,
                rlq:                       rlq,
                log:                       
client.log.SubLogger(log.Fields{"topic": topic}),
                consumerName:              options.Name,
-               metrics:                   
client.metrics.GetTopicMetrics(topic),
+               metrics:                   
client.metrics.GetLeveledMetrics(topic),
        }
 
        err := consumer.internalTopicSubscribeToPartitions()
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index e691d14..30639bd 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -144,13 +144,13 @@ type partitionConsumer struct {
 
        providersMutex       sync.RWMutex
        compressionProviders map[pb.CompressionType]compression.Provider
-       metrics              *internal.TopicMetrics
+       metrics              *internal.LeveledMetrics
        decryptor            cryptointernal.Decryptor
 }
 
 func newPartitionConsumer(parent Consumer, client *client, options 
*partitionConsumerOpts,
        messageCh chan ConsumerMessage, dlq *dlqRouter,
-       metrics *internal.TopicMetrics) (*partitionConsumer, error) {
+       metrics *internal.LeveledMetrics) (*partitionConsumer, error) {
        pc := &partitionConsumer{
                parentConsumer:       parent,
                client:               client,
diff --git a/pulsar/consumer_partition_test.go 
b/pulsar/consumer_partition_test.go
index 560afb6..8433049 100644
--- a/pulsar/consumer_partition_test.go
+++ b/pulsar/consumer_partition_test.go
@@ -36,7 +36,7 @@ func TestSingleMessageIDNoAckTracker(t *testing.T) {
                eventsCh:             eventsCh,
                compressionProviders: 
make(map[pb.CompressionType]compression.Provider),
                options:              &partitionConsumerOpts{},
-               metrics:              
internal.NewMetricsProvider(map[string]string{}).GetTopicMetrics("topic"),
+               metrics:              internal.NewMetricsProvider(4, 
map[string]string{}).GetLeveledMetrics("topic"),
                decryptor:            crypto.NewNoopDecryptor(),
        }
 
@@ -68,7 +68,7 @@ func TestBatchMessageIDNoAckTracker(t *testing.T) {
                eventsCh:             eventsCh,
                compressionProviders: 
make(map[pb.CompressionType]compression.Provider),
                options:              &partitionConsumerOpts{},
-               metrics:              
internal.NewMetricsProvider(map[string]string{}).GetTopicMetrics("topic"),
+               metrics:              internal.NewMetricsProvider(4, 
map[string]string{}).GetLeveledMetrics("topic"),
                decryptor:            crypto.NewNoopDecryptor(),
        }
 
@@ -100,7 +100,7 @@ func TestBatchMessageIDWithAckTracker(t *testing.T) {
                eventsCh:             eventsCh,
                compressionProviders: 
make(map[pb.CompressionType]compression.Provider),
                options:              &partitionConsumerOpts{},
-               metrics:              
internal.NewMetricsProvider(map[string]string{}).GetTopicMetrics("topic"),
+               metrics:              internal.NewMetricsProvider(4, 
map[string]string{}).GetLeveledMetrics("topic"),
                decryptor:            crypto.NewNoopDecryptor(),
        }
 
diff --git a/pulsar/internal/lookup_service_test.go 
b/pulsar/internal/lookup_service_test.go
index 1604319..6bab008 100644
--- a/pulsar/internal/lookup_service_test.go
+++ b/pulsar/internal/lookup_service_test.go
@@ -138,7 +138,7 @@ func TestLookupSuccess(t *testing.T) {
                                BrokerServiceUrl: 
proto.String("pulsar://broker-1:6650"),
                        },
                },
-       }, url, serviceNameResolver, false, "", log.DefaultNopLogger(), 
NewMetricsProvider(map[string]string{}))
+       }, url, serviceNameResolver, false, "", log.DefaultNopLogger(), 
NewMetricsProvider(4, map[string]string{}))
 
        lr, err := ls.Lookup("my-topic")
        assert.NoError(t, err)
@@ -172,7 +172,7 @@ func TestTlsLookupSuccess(t *testing.T) {
                                BrokerServiceUrlTls: 
proto.String("pulsar+ssl://broker-1:6651"),
                        },
                },
-       }, url, serviceNameResolver, true, "", log.DefaultNopLogger(), 
NewMetricsProvider(map[string]string{}))
+       }, url, serviceNameResolver, true, "", log.DefaultNopLogger(), 
NewMetricsProvider(4, map[string]string{}))
 
        lr, err := ls.Lookup("my-topic")
        assert.NoError(t, err)
@@ -207,7 +207,7 @@ func TestLookupWithProxy(t *testing.T) {
                                ProxyThroughServiceUrl: proto.Bool(true),
                        },
                },
-       }, url, serviceNameResolver, false, "", log.DefaultNopLogger(), 
NewMetricsProvider(map[string]string{}))
+       }, url, serviceNameResolver, false, "", log.DefaultNopLogger(), 
NewMetricsProvider(4, map[string]string{}))
 
        lr, err := ls.Lookup("my-topic")
        assert.NoError(t, err)
@@ -241,7 +241,8 @@ func TestTlsLookupWithProxy(t *testing.T) {
                                ProxyThroughServiceUrl: proto.Bool(true),
                        },
                },
-       }, url, NewPulsarServiceNameResolver(url), true, "", 
log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))
+       }, url, NewPulsarServiceNameResolver(url), true, "", 
log.DefaultNopLogger(),
+               NewMetricsProvider(4, map[string]string{}))
 
        lr, err := ls.Lookup("my-topic")
        assert.NoError(t, err)
@@ -287,7 +288,8 @@ func TestLookupWithRedirect(t *testing.T) {
                                BrokerServiceUrl: 
proto.String("pulsar://broker-1:6650"),
                        },
                },
-       }, url, NewPulsarServiceNameResolver(url), false, "", 
log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))
+       }, url, NewPulsarServiceNameResolver(url), false, "", 
log.DefaultNopLogger(),
+               NewMetricsProvider(4, map[string]string{}))
 
        lr, err := ls.Lookup("my-topic")
        assert.NoError(t, err)
@@ -333,7 +335,8 @@ func TestTlsLookupWithRedirect(t *testing.T) {
                                BrokerServiceUrlTls: 
proto.String("pulsar+ssl://broker-1:6651"),
                        },
                },
-       }, url, NewPulsarServiceNameResolver(url), true, "", 
log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))
+       }, url, NewPulsarServiceNameResolver(url), true, "", 
log.DefaultNopLogger(),
+               NewMetricsProvider(4, map[string]string{}))
 
        lr, err := ls.Lookup("my-topic")
        assert.NoError(t, err)
@@ -367,7 +370,8 @@ func TestLookupWithInvalidUrlResponse(t *testing.T) {
                                ProxyThroughServiceUrl: proto.Bool(false),
                        },
                },
-       }, url, NewPulsarServiceNameResolver(url), false, "", 
log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))
+       }, url, NewPulsarServiceNameResolver(url), false, "", 
log.DefaultNopLogger(),
+               NewMetricsProvider(4, map[string]string{}))
 
        lr, err := ls.Lookup("my-topic")
        assert.Error(t, err)
@@ -396,7 +400,8 @@ func TestLookupWithLookupFailure(t *testing.T) {
                                Authoritative: proto.Bool(true),
                        },
                },
-       }, url, NewPulsarServiceNameResolver(url), false, "", 
log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))
+       }, url, NewPulsarServiceNameResolver(url), false, "", 
log.DefaultNopLogger(),
+               NewMetricsProvider(4, map[string]string{}))
 
        lr, err := ls.Lookup("my-topic")
        assert.Error(t, err)
@@ -483,7 +488,7 @@ func TestGetPartitionedTopicMetadataSuccess(t *testing.T) {
                                Response:   
pb.CommandPartitionedTopicMetadataResponse_Success.Enum(),
                        },
                },
-       }, url, serviceNameResolver, false, "", log.DefaultNopLogger(), 
NewMetricsProvider(map[string]string{}))
+       }, url, serviceNameResolver, false, "", log.DefaultNopLogger(), 
NewMetricsProvider(4, map[string]string{}))
 
        metadata, err := ls.GetPartitionedTopicMetadata("my-topic")
        assert.NoError(t, err)
@@ -515,7 +520,7 @@ func TestLookupSuccessWithMultipleHosts(t *testing.T) {
                                BrokerServiceUrl: 
proto.String("pulsar://broker-1:6650"),
                        },
                },
-       }, url, serviceNameResolver, false, "", log.DefaultNopLogger(), 
NewMetricsProvider(map[string]string{}))
+       }, url, serviceNameResolver, false, "", log.DefaultNopLogger(), 
NewMetricsProvider(4, map[string]string{}))
 
        lr, err := ls.Lookup("my-topic")
        assert.NoError(t, err)
@@ -576,7 +581,7 @@ func TestHttpLookupSuccess(t *testing.T) {
        serviceNameResolver := NewPulsarServiceNameResolver(url)
        httpClient := NewMockHTTPClient(serviceNameResolver)
        ls := NewHTTPLookupService(httpClient, url, serviceNameResolver, false,
-               log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))
+               log.DefaultNopLogger(), NewMetricsProvider(4, 
map[string]string{}))
 
        lr, err := ls.Lookup("my-topic")
        assert.NoError(t, err)
@@ -592,7 +597,7 @@ func TestHttpGetPartitionedTopicMetadataSuccess(t 
*testing.T) {
        serviceNameResolver := NewPulsarServiceNameResolver(url)
        httpClient := NewMockHTTPClient(serviceNameResolver)
        ls := NewHTTPLookupService(httpClient, url, serviceNameResolver, false,
-               log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))
+               log.DefaultNopLogger(), NewMetricsProvider(4, 
map[string]string{}))
 
        tMetadata, err := ls.GetPartitionedTopicMetadata("my-topic")
        assert.NoError(t, err)
diff --git a/pulsar/internal/metrics.go b/pulsar/internal/metrics.go
index 6902ca9..ec1a96e 100644
--- a/pulsar/internal/metrics.go
+++ b/pulsar/internal/metrics.go
@@ -21,9 +21,8 @@ import (
        "github.com/prometheus/client_golang/prometheus"
 )
 
-var topicLabelNames = []string{"pulsar_tenant", "pulsar_namespace", "topic"}
-
 type Metrics struct {
+       metricsLevel      int
        messagesPublished *prometheus.CounterVec
        bytesPublished    *prometheus.CounterVec
        messagesPending   *prometheus.GaugeVec
@@ -50,7 +49,7 @@ type Metrics struct {
        readersOpened       *prometheus.CounterVec
        readersClosed       *prometheus.CounterVec
 
-       // Metrics that are not labeled with topic, are immediately available
+       // Metrics that are not labeled with specificity are immediately 
available
        ConnectionsOpened                     prometheus.Counter
        ConnectionsClosed                     prometheus.Counter
        ConnectionsEstablishmentErrors        prometheus.Counter
@@ -60,7 +59,7 @@ type Metrics struct {
        RPCRequestCount                       prometheus.Counter
 }
 
-type TopicMetrics struct {
+type LeveledMetrics struct {
        MessagesPublished        prometheus.Counter
        BytesPublished           prometheus.Counter
        MessagesPending          prometheus.Gauge
@@ -89,155 +88,171 @@ type TopicMetrics struct {
        ReadersClosed       prometheus.Counter
 }
 
-func NewMetricsProvider(userDefinedLabels map[string]string) *Metrics {
+func NewMetricsProvider(metricsCardinality int, userDefinedLabels 
map[string]string) *Metrics {
        constLabels := map[string]string{
                "client": "go",
        }
        for k, v := range userDefinedLabels {
                constLabels[k] = v
        }
+       var metricsLevelLabels []string
+
+       // note: ints here mirror MetricsCardinality in client.go to avoid 
import cycle
+       switch metricsCardinality {
+       case 1: //MetricsCardinalityNone
+               metricsLevelLabels = []string{}
+       case 2: //MetricsCardinalityTenant
+               metricsLevelLabels = []string{"pulsar_tenant"}
+       case 3: //MetricsCardinalityNamespace
+               metricsLevelLabels = []string{"pulsar_tenant", 
"pulsar_namespace"}
+       case 4: //MetricsCardinalityTopic
+               metricsLevelLabels = []string{"pulsar_tenant", 
"pulsar_namespace", "topic"}
+       default: //Anything else is namespace
+               metricsLevelLabels = []string{"pulsar_tenant", 
"pulsar_namespace"}
+       }
 
        metrics := &Metrics{
+               metricsLevel: metricsCardinality,
                messagesPublished: 
prometheus.NewCounterVec(prometheus.CounterOpts{
                        Name:        "pulsar_client_messages_published",
                        Help:        "Counter of messages published by the 
client",
                        ConstLabels: constLabels,
-               }, topicLabelNames),
+               }, metricsLevelLabels),
 
                bytesPublished: prometheus.NewCounterVec(prometheus.CounterOpts{
                        Name:        "pulsar_client_bytes_published",
                        Help:        "Counter of messages published by the 
client",
                        ConstLabels: constLabels,
-               }, topicLabelNames),
+               }, metricsLevelLabels),
 
                messagesPending: prometheus.NewGaugeVec(prometheus.GaugeOpts{
                        Name:        "pulsar_client_producer_pending_messages",
                        Help:        "Counter of messages pending to be 
published by the client",
                        ConstLabels: constLabels,
-               }, topicLabelNames),
+               }, metricsLevelLabels),
 
                bytesPending: prometheus.NewGaugeVec(prometheus.GaugeOpts{
                        Name:        "pulsar_client_producer_pending_bytes",
                        Help:        "Counter of bytes pending to be published 
by the client",
                        ConstLabels: constLabels,
-               }, topicLabelNames),
+               }, metricsLevelLabels),
 
                publishErrors: prometheus.NewCounterVec(prometheus.CounterOpts{
                        Name:        "pulsar_client_producer_errors",
                        Help:        "Counter of publish errors",
                        ConstLabels: constLabels,
-               }, append(topicLabelNames, "error")),
+               }, append(metricsLevelLabels, "error")),
 
                publishLatency: 
prometheus.NewHistogramVec(prometheus.HistogramOpts{
                        Name:        "pulsar_client_producer_latency_seconds",
                        Help:        "Publish latency experienced by the 
client",
                        ConstLabels: constLabels,
                        Buckets:     []float64{.0005, .001, .005, .01, .025, 
.05, .1, .25, .5, 1, 2.5, 5, 10},
-               }, topicLabelNames),
+               }, metricsLevelLabels),
 
                publishRPCLatency: 
prometheus.NewHistogramVec(prometheus.HistogramOpts{
                        Name:        
"pulsar_client_producer_rpc_latency_seconds",
                        Help:        "Publish RPC latency experienced 
internally by the client when sending data to receiving an ack",
                        ConstLabels: constLabels,
                        Buckets:     []float64{.0005, .001, .005, .01, .025, 
.05, .1, .25, .5, 1, 2.5, 5, 10},
-               }, topicLabelNames),
+               }, metricsLevelLabels),
 
                producersOpened: 
prometheus.NewCounterVec(prometheus.CounterOpts{
                        Name:        "pulsar_client_producers_opened",
                        Help:        "Counter of producers created by the 
client",
                        ConstLabels: constLabels,
-               }, topicLabelNames),
+               }, metricsLevelLabels),
 
                producersClosed: 
prometheus.NewCounterVec(prometheus.CounterOpts{
                        Name:        "pulsar_client_producers_closed",
                        Help:        "Counter of producers closed by the 
client",
                        ConstLabels: constLabels,
-               }, topicLabelNames),
+               }, metricsLevelLabels),
 
                producersPartitions: 
prometheus.NewGaugeVec(prometheus.GaugeOpts{
                        Name:        
"pulsar_client_producers_partitions_active",
                        Help:        "Counter of individual partitions the 
producers are currently active",
                        ConstLabels: constLabels,
-               }, topicLabelNames),
+               }, metricsLevelLabels),
 
                consumersOpened: 
prometheus.NewCounterVec(prometheus.CounterOpts{
                        Name:        "pulsar_client_consumers_opened",
                        Help:        "Counter of consumers created by the 
client",
                        ConstLabels: constLabels,
-               }, topicLabelNames),
+               }, metricsLevelLabels),
 
                consumersClosed: 
prometheus.NewCounterVec(prometheus.CounterOpts{
                        Name:        "pulsar_client_consumers_closed",
                        Help:        "Counter of consumers closed by the 
client",
                        ConstLabels: constLabels,
-               }, topicLabelNames),
+               }, metricsLevelLabels),
 
                consumersPartitions: 
prometheus.NewGaugeVec(prometheus.GaugeOpts{
                        Name:        
"pulsar_client_consumers_partitions_active",
                        Help:        "Counter of individual partitions the 
consumers are currently active",
                        ConstLabels: constLabels,
-               }, topicLabelNames),
+               }, metricsLevelLabels),
 
                messagesReceived: 
prometheus.NewCounterVec(prometheus.CounterOpts{
                        Name:        "pulsar_client_messages_received",
                        Help:        "Counter of messages received by the 
client",
                        ConstLabels: constLabels,
-               }, topicLabelNames),
+               }, metricsLevelLabels),
 
                bytesReceived: prometheus.NewCounterVec(prometheus.CounterOpts{
                        Name:        "pulsar_client_bytes_received",
                        Help:        "Counter of bytes received by the client",
                        ConstLabels: constLabels,
-               }, topicLabelNames),
+               }, metricsLevelLabels),
 
                prefetchedMessages: prometheus.NewGaugeVec(prometheus.GaugeOpts{
                        Name:        
"pulsar_client_consumer_prefetched_messages",
                        Help:        "Number of messages currently sitting in 
the consumer pre-fetch queue",
                        ConstLabels: constLabels,
-               }, topicLabelNames),
+               }, metricsLevelLabels),
 
                prefetchedBytes: prometheus.NewGaugeVec(prometheus.GaugeOpts{
                        Name:        "pulsar_client_consumer_prefetched_bytes",
                        Help:        "Total number of bytes currently sitting 
in the consumer pre-fetch queue",
                        ConstLabels: constLabels,
-               }, topicLabelNames),
+               }, metricsLevelLabels),
 
                acksCounter: prometheus.NewCounterVec(prometheus.CounterOpts{
                        Name:        "pulsar_client_consumer_acks",
                        Help:        "Counter of messages acked by client",
                        ConstLabels: constLabels,
-               }, topicLabelNames),
+               }, metricsLevelLabels),
 
                nacksCounter: prometheus.NewCounterVec(prometheus.CounterOpts{
                        Name:        "pulsar_client_consumer_nacks",
                        Help:        "Counter of messages nacked by client",
                        ConstLabels: constLabels,
-               }, topicLabelNames),
+               }, metricsLevelLabels),
 
                dlqCounter: prometheus.NewCounterVec(prometheus.CounterOpts{
                        Name:        "pulsar_client_consumer_dlq_messages",
                        Help:        "Counter of messages sent to Dead letter 
queue",
                        ConstLabels: constLabels,
-               }, topicLabelNames),
+               }, metricsLevelLabels),
 
                processingTime: 
prometheus.NewHistogramVec(prometheus.HistogramOpts{
                        Name:        
"pulsar_client_consumer_processing_time_seconds",
                        Help:        "Time it takes for application to process 
messages",
                        Buckets:     []float64{.0005, .001, .005, .01, .025, 
.05, .1, .25, .5, 1, 2.5, 5, 10},
                        ConstLabels: constLabels,
-               }, topicLabelNames),
+               }, metricsLevelLabels),
 
                readersOpened: prometheus.NewCounterVec(prometheus.CounterOpts{
                        Name:        "pulsar_client_readers_opened",
                        Help:        "Counter of readers created by the client",
                        ConstLabels: constLabels,
-               }, topicLabelNames),
+               }, metricsLevelLabels),
 
                readersClosed: prometheus.NewCounterVec(prometheus.CounterOpts{
                        Name:        "pulsar_client_readers_closed",
                        Help:        "Counter of readers closed by the client",
                        ConstLabels: constLabels,
-               }, topicLabelNames),
+               }, metricsLevelLabels),
 
                ConnectionsOpened: prometheus.NewCounter(prometheus.CounterOpts{
                        Name:        "pulsar_client_connections_opened",
@@ -465,16 +480,22 @@ func NewMetricsProvider(userDefinedLabels 
map[string]string) *Metrics {
        return metrics
 }
 
-func (mp *Metrics) GetTopicMetrics(t string) *TopicMetrics {
+func (mp *Metrics) GetLeveledMetrics(t string) *LeveledMetrics {
+       labels := make(map[string]string, 3)
        tn, _ := ParseTopicName(t)
        topic := TopicNameWithoutPartitionPart(tn)
-       labels := map[string]string{
-               "pulsar_tenant":    tn.Tenant,
-               "pulsar_namespace": tn.Namespace,
-               "topic":            topic,
-       }
-
-       tm := &TopicMetrics{
+       switch mp.metricsLevel {
+       case 4:
+               labels["topic"] = topic
+               fallthrough
+       case 3:
+               labels["pulsar_namespace"] = tn.Namespace
+               fallthrough
+       case 2:
+               labels["pulsar_tenant"] = tn.Tenant
+       }
+
+       lm := &LeveledMetrics{
                MessagesPublished:        mp.messagesPublished.With(labels),
                BytesPublished:           mp.bytesPublished.With(labels),
                MessagesPending:          mp.messagesPending.With(labels),
@@ -503,7 +524,7 @@ func (mp *Metrics) GetTopicMetrics(t string) *TopicMetrics {
                ReadersClosed:       mp.readersClosed.With(labels),
        }
 
-       return tm
+       return lm
 }
 
 func mergeMaps(a, b map[string]string) map[string]string {
diff --git a/pulsar/producer_impl.go b/pulsar/producer_impl.go
index 20e8d3d..48e2aa4 100644
--- a/pulsar/producer_impl.go
+++ b/pulsar/producer_impl.go
@@ -57,7 +57,7 @@ type producer struct {
        closeOnce     sync.Once
        stopDiscovery func()
        log           log.Logger
-       metrics       *internal.TopicMetrics
+       metrics       *internal.LeveledMetrics
 }
 
 func getHashingFunction(s HashingScheme) func(string) uint32 {
@@ -97,7 +97,7 @@ func newProducer(client *client, options *ProducerOptions) 
(*producer, error) {
                topic:   options.Topic,
                client:  client,
                log:     client.log.SubLogger(log.Fields{"topic": 
options.Topic}),
-               metrics: client.metrics.GetTopicMetrics(options.Topic),
+               metrics: client.metrics.GetLeveledMetrics(options.Topic),
        }
 
        if options.Interceptors == nil {
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index e273021..e1762f9 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -84,13 +84,13 @@ type partitionProducer struct {
        lastSequenceID   int64
        schemaInfo       *SchemaInfo
        partitionIdx     int32
-       metrics          *internal.TopicMetrics
+       metrics          *internal.LeveledMetrics
 
        epoch uint64
 }
 
 func newPartitionProducer(client *client, topic string, options 
*ProducerOptions, partitionIdx int,
-       metrics *internal.TopicMetrics) (
+       metrics *internal.LeveledMetrics) (
        *partitionProducer, error) {
        var batchingMaxPublishDelay time.Duration
        if options.BatchingMaxPublishDelay != 0 {
diff --git a/pulsar/reader_impl.go b/pulsar/reader_impl.go
index 0e63f8b..c854325 100644
--- a/pulsar/reader_impl.go
+++ b/pulsar/reader_impl.go
@@ -38,7 +38,7 @@ type reader struct {
        messageCh           chan ConsumerMessage
        lastMessageInBroker trackingMessageID
        log                 log.Logger
-       metrics             *internal.TopicMetrics
+       metrics             *internal.LeveledMetrics
 }
 
 func newReader(client *client, options ReaderOptions) (Reader, error) {
@@ -96,7 +96,7 @@ func newReader(client *client, options ReaderOptions) 
(Reader, error) {
                client:    client,
                messageCh: make(chan ConsumerMessage),
                log:       client.log.SubLogger(log.Fields{"topic": 
options.Topic}),
-               metrics:   client.metrics.GetTopicMetrics(options.Topic),
+               metrics:   client.metrics.GetLeveledMetrics(options.Topic),
        }
 
        // Provide dummy dlq router with not dlq policy

Reply via email to