This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new ce79489 User-defined metric cardinality (#604)
ce79489 is described below
commit ce794898bce8d0799e143b42833167252a3b6cd0
Author: Andy Walker <[email protected]>
AuthorDate: Sat Oct 9 05:03:58 2021 -0400
User-defined metric cardinality (#604)
* initial commit
* forgot "none"
* Satisfy the lint gods
Co-authored-by: xiaolongran <[email protected]>
---
go.mod | 2 +
go.sum | 2 -
pulsar/client.go | 15 ++++++
pulsar/client_impl.go | 8 ++-
pulsar/consumer_impl.go | 4 +-
pulsar/consumer_partition.go | 4 +-
pulsar/consumer_partition_test.go | 6 +--
pulsar/internal/lookup_service_test.go | 29 ++++++-----
pulsar/internal/metrics.go | 95 +++++++++++++++++++++-------------
pulsar/producer_impl.go | 4 +-
pulsar/producer_partition.go | 4 +-
pulsar/reader_impl.go | 4 +-
12 files changed, 111 insertions(+), 66 deletions(-)
diff --git a/go.mod b/go.mod
index 354f5b4..af7c571 100644
--- a/go.mod
+++ b/go.mod
@@ -15,6 +15,8 @@ require (
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/klauspost/compress v1.10.8
github.com/linkedin/goavro/v2 v2.9.8
+ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd //
indirect
+ github.com/modern-go/reflect2 v1.0.1 // indirect
github.com/opentracing/opentracing-go v1.2.0
github.com/pierrec/lz4 v2.0.5+incompatible
github.com/pkg/errors v0.9.1
diff --git a/go.sum b/go.sum
index 8b372c7..85fba89 100644
--- a/go.sum
+++ b/go.sum
@@ -29,7 +29,6 @@ github.com/danieljoos/wincred v1.0.2/go.mod
h1:SnuYRW9lp1oJrZX/dXJqr0cPK5gYXqx3E
github.com/davecgh/go-spew v1.1.0/go.mod
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1
h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
-github.com/dgrijalva/jwt-go v3.2.0+incompatible
h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM=
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod
h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
github.com/dimfeld/httptreemux v5.0.1+incompatible
h1:Qj3gVcDNoOthBAqftuD596rm4wg/adLLz5xh5CmpiCA=
github.com/dimfeld/httptreemux v5.0.1+incompatible/go.mod
h1:rbUlSV+CCpv/SuqUTP/8Bk2O3LyUV436/yaRGkhP6Z0=
@@ -159,7 +158,6 @@ github.com/stretchr/objx v0.2.0
h1:Hbg2NidpLE8veEBkEZTL3CvlkUIVzuU9jDplZO54c48=
github.com/stretchr/objx v0.2.0/go.mod
h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE=
github.com/stretchr/testify v1.2.2/go.mod
h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod
h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
-github.com/stretchr/testify v1.4.0
h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
github.com/stretchr/testify v1.4.0/go.mod
h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.5.1
h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4=
github.com/stretchr/testify v1.5.1/go.mod
h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
diff --git a/pulsar/client.go b/pulsar/client.go
index cc6fb3c..8ff152a 100644
--- a/pulsar/client.go
+++ b/pulsar/client.go
@@ -117,6 +117,10 @@ type ClientOptions struct {
// FIXME: use `logger` as internal field name instead of `log` as it's
more idiomatic
Logger log.Logger
+ // Specify metric cardinality to the tenant, namespace or topic levels,
or remove it completely.
+ // Default: MetricsCardinalityNamespace
+ MetricsCardinality MetricsCardinality
+
// Add custom labels to all the metrics reported by this client instance
CustomMetricsLabels map[string]string
}
@@ -150,3 +154,14 @@ type Client interface {
// Close Closes the Client and free associated resources
Close()
}
+
+// MetricsCardinality represents the specificty of labels on a per-metric basis
+type MetricsCardinality int
+
+const (
+ _ MetricsCardinality = iota
+ MetricsCardinalityNone // Do not add additional
labels to metrics
+ MetricsCardinalityTenant // Label metrics by
tenant
+ MetricsCardinalityNamespace // Label metrics by
tenant and namespace
+ MetricsCardinalityTopic // Label metrics by topic
+)
diff --git a/pulsar/client_impl.go b/pulsar/client_impl.go
index 7d2fcfd..5682927 100644
--- a/pulsar/client_impl.go
+++ b/pulsar/client_impl.go
@@ -107,11 +107,15 @@ func newClient(options ClientOptions) (Client, error) {
maxConnectionsPerHost = 1
}
+ if options.MetricsCardinality == 0 {
+ options.MetricsCardinality = MetricsCardinalityNamespace
+ }
+
var metrics *internal.Metrics
if options.CustomMetricsLabels != nil {
- metrics =
internal.NewMetricsProvider(options.CustomMetricsLabels)
+ metrics =
internal.NewMetricsProvider(int(options.MetricsCardinality),
options.CustomMetricsLabels)
} else {
- metrics = internal.NewMetricsProvider(map[string]string{})
+ metrics =
internal.NewMetricsProvider(int(options.MetricsCardinality),
map[string]string{})
}
c := &client{
diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index 232079b..6aef497 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -57,7 +57,7 @@ type consumer struct {
stopDiscovery func()
log log.Logger
- metrics *internal.TopicMetrics
+ metrics *internal.LeveledMetrics
}
func newConsumer(client *client, options ConsumerOptions) (Consumer, error) {
@@ -197,7 +197,7 @@ func newInternalConsumer(client *client, options
ConsumerOptions, topic string,
rlq: rlq,
log:
client.log.SubLogger(log.Fields{"topic": topic}),
consumerName: options.Name,
- metrics:
client.metrics.GetTopicMetrics(topic),
+ metrics:
client.metrics.GetLeveledMetrics(topic),
}
err := consumer.internalTopicSubscribeToPartitions()
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index e691d14..30639bd 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -144,13 +144,13 @@ type partitionConsumer struct {
providersMutex sync.RWMutex
compressionProviders map[pb.CompressionType]compression.Provider
- metrics *internal.TopicMetrics
+ metrics *internal.LeveledMetrics
decryptor cryptointernal.Decryptor
}
func newPartitionConsumer(parent Consumer, client *client, options
*partitionConsumerOpts,
messageCh chan ConsumerMessage, dlq *dlqRouter,
- metrics *internal.TopicMetrics) (*partitionConsumer, error) {
+ metrics *internal.LeveledMetrics) (*partitionConsumer, error) {
pc := &partitionConsumer{
parentConsumer: parent,
client: client,
diff --git a/pulsar/consumer_partition_test.go
b/pulsar/consumer_partition_test.go
index 560afb6..8433049 100644
--- a/pulsar/consumer_partition_test.go
+++ b/pulsar/consumer_partition_test.go
@@ -36,7 +36,7 @@ func TestSingleMessageIDNoAckTracker(t *testing.T) {
eventsCh: eventsCh,
compressionProviders:
make(map[pb.CompressionType]compression.Provider),
options: &partitionConsumerOpts{},
- metrics:
internal.NewMetricsProvider(map[string]string{}).GetTopicMetrics("topic"),
+ metrics: internal.NewMetricsProvider(4,
map[string]string{}).GetLeveledMetrics("topic"),
decryptor: crypto.NewNoopDecryptor(),
}
@@ -68,7 +68,7 @@ func TestBatchMessageIDNoAckTracker(t *testing.T) {
eventsCh: eventsCh,
compressionProviders:
make(map[pb.CompressionType]compression.Provider),
options: &partitionConsumerOpts{},
- metrics:
internal.NewMetricsProvider(map[string]string{}).GetTopicMetrics("topic"),
+ metrics: internal.NewMetricsProvider(4,
map[string]string{}).GetLeveledMetrics("topic"),
decryptor: crypto.NewNoopDecryptor(),
}
@@ -100,7 +100,7 @@ func TestBatchMessageIDWithAckTracker(t *testing.T) {
eventsCh: eventsCh,
compressionProviders:
make(map[pb.CompressionType]compression.Provider),
options: &partitionConsumerOpts{},
- metrics:
internal.NewMetricsProvider(map[string]string{}).GetTopicMetrics("topic"),
+ metrics: internal.NewMetricsProvider(4,
map[string]string{}).GetLeveledMetrics("topic"),
decryptor: crypto.NewNoopDecryptor(),
}
diff --git a/pulsar/internal/lookup_service_test.go
b/pulsar/internal/lookup_service_test.go
index 1604319..6bab008 100644
--- a/pulsar/internal/lookup_service_test.go
+++ b/pulsar/internal/lookup_service_test.go
@@ -138,7 +138,7 @@ func TestLookupSuccess(t *testing.T) {
BrokerServiceUrl:
proto.String("pulsar://broker-1:6650"),
},
},
- }, url, serviceNameResolver, false, "", log.DefaultNopLogger(),
NewMetricsProvider(map[string]string{}))
+ }, url, serviceNameResolver, false, "", log.DefaultNopLogger(),
NewMetricsProvider(4, map[string]string{}))
lr, err := ls.Lookup("my-topic")
assert.NoError(t, err)
@@ -172,7 +172,7 @@ func TestTlsLookupSuccess(t *testing.T) {
BrokerServiceUrlTls:
proto.String("pulsar+ssl://broker-1:6651"),
},
},
- }, url, serviceNameResolver, true, "", log.DefaultNopLogger(),
NewMetricsProvider(map[string]string{}))
+ }, url, serviceNameResolver, true, "", log.DefaultNopLogger(),
NewMetricsProvider(4, map[string]string{}))
lr, err := ls.Lookup("my-topic")
assert.NoError(t, err)
@@ -207,7 +207,7 @@ func TestLookupWithProxy(t *testing.T) {
ProxyThroughServiceUrl: proto.Bool(true),
},
},
- }, url, serviceNameResolver, false, "", log.DefaultNopLogger(),
NewMetricsProvider(map[string]string{}))
+ }, url, serviceNameResolver, false, "", log.DefaultNopLogger(),
NewMetricsProvider(4, map[string]string{}))
lr, err := ls.Lookup("my-topic")
assert.NoError(t, err)
@@ -241,7 +241,8 @@ func TestTlsLookupWithProxy(t *testing.T) {
ProxyThroughServiceUrl: proto.Bool(true),
},
},
- }, url, NewPulsarServiceNameResolver(url), true, "",
log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))
+ }, url, NewPulsarServiceNameResolver(url), true, "",
log.DefaultNopLogger(),
+ NewMetricsProvider(4, map[string]string{}))
lr, err := ls.Lookup("my-topic")
assert.NoError(t, err)
@@ -287,7 +288,8 @@ func TestLookupWithRedirect(t *testing.T) {
BrokerServiceUrl:
proto.String("pulsar://broker-1:6650"),
},
},
- }, url, NewPulsarServiceNameResolver(url), false, "",
log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))
+ }, url, NewPulsarServiceNameResolver(url), false, "",
log.DefaultNopLogger(),
+ NewMetricsProvider(4, map[string]string{}))
lr, err := ls.Lookup("my-topic")
assert.NoError(t, err)
@@ -333,7 +335,8 @@ func TestTlsLookupWithRedirect(t *testing.T) {
BrokerServiceUrlTls:
proto.String("pulsar+ssl://broker-1:6651"),
},
},
- }, url, NewPulsarServiceNameResolver(url), true, "",
log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))
+ }, url, NewPulsarServiceNameResolver(url), true, "",
log.DefaultNopLogger(),
+ NewMetricsProvider(4, map[string]string{}))
lr, err := ls.Lookup("my-topic")
assert.NoError(t, err)
@@ -367,7 +370,8 @@ func TestLookupWithInvalidUrlResponse(t *testing.T) {
ProxyThroughServiceUrl: proto.Bool(false),
},
},
- }, url, NewPulsarServiceNameResolver(url), false, "",
log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))
+ }, url, NewPulsarServiceNameResolver(url), false, "",
log.DefaultNopLogger(),
+ NewMetricsProvider(4, map[string]string{}))
lr, err := ls.Lookup("my-topic")
assert.Error(t, err)
@@ -396,7 +400,8 @@ func TestLookupWithLookupFailure(t *testing.T) {
Authoritative: proto.Bool(true),
},
},
- }, url, NewPulsarServiceNameResolver(url), false, "",
log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))
+ }, url, NewPulsarServiceNameResolver(url), false, "",
log.DefaultNopLogger(),
+ NewMetricsProvider(4, map[string]string{}))
lr, err := ls.Lookup("my-topic")
assert.Error(t, err)
@@ -483,7 +488,7 @@ func TestGetPartitionedTopicMetadataSuccess(t *testing.T) {
Response:
pb.CommandPartitionedTopicMetadataResponse_Success.Enum(),
},
},
- }, url, serviceNameResolver, false, "", log.DefaultNopLogger(),
NewMetricsProvider(map[string]string{}))
+ }, url, serviceNameResolver, false, "", log.DefaultNopLogger(),
NewMetricsProvider(4, map[string]string{}))
metadata, err := ls.GetPartitionedTopicMetadata("my-topic")
assert.NoError(t, err)
@@ -515,7 +520,7 @@ func TestLookupSuccessWithMultipleHosts(t *testing.T) {
BrokerServiceUrl:
proto.String("pulsar://broker-1:6650"),
},
},
- }, url, serviceNameResolver, false, "", log.DefaultNopLogger(),
NewMetricsProvider(map[string]string{}))
+ }, url, serviceNameResolver, false, "", log.DefaultNopLogger(),
NewMetricsProvider(4, map[string]string{}))
lr, err := ls.Lookup("my-topic")
assert.NoError(t, err)
@@ -576,7 +581,7 @@ func TestHttpLookupSuccess(t *testing.T) {
serviceNameResolver := NewPulsarServiceNameResolver(url)
httpClient := NewMockHTTPClient(serviceNameResolver)
ls := NewHTTPLookupService(httpClient, url, serviceNameResolver, false,
- log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))
+ log.DefaultNopLogger(), NewMetricsProvider(4,
map[string]string{}))
lr, err := ls.Lookup("my-topic")
assert.NoError(t, err)
@@ -592,7 +597,7 @@ func TestHttpGetPartitionedTopicMetadataSuccess(t
*testing.T) {
serviceNameResolver := NewPulsarServiceNameResolver(url)
httpClient := NewMockHTTPClient(serviceNameResolver)
ls := NewHTTPLookupService(httpClient, url, serviceNameResolver, false,
- log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))
+ log.DefaultNopLogger(), NewMetricsProvider(4,
map[string]string{}))
tMetadata, err := ls.GetPartitionedTopicMetadata("my-topic")
assert.NoError(t, err)
diff --git a/pulsar/internal/metrics.go b/pulsar/internal/metrics.go
index 6902ca9..ec1a96e 100644
--- a/pulsar/internal/metrics.go
+++ b/pulsar/internal/metrics.go
@@ -21,9 +21,8 @@ import (
"github.com/prometheus/client_golang/prometheus"
)
-var topicLabelNames = []string{"pulsar_tenant", "pulsar_namespace", "topic"}
-
type Metrics struct {
+ metricsLevel int
messagesPublished *prometheus.CounterVec
bytesPublished *prometheus.CounterVec
messagesPending *prometheus.GaugeVec
@@ -50,7 +49,7 @@ type Metrics struct {
readersOpened *prometheus.CounterVec
readersClosed *prometheus.CounterVec
- // Metrics that are not labeled with topic, are immediately available
+ // Metrics that are not labeled with specificity are immediately
available
ConnectionsOpened prometheus.Counter
ConnectionsClosed prometheus.Counter
ConnectionsEstablishmentErrors prometheus.Counter
@@ -60,7 +59,7 @@ type Metrics struct {
RPCRequestCount prometheus.Counter
}
-type TopicMetrics struct {
+type LeveledMetrics struct {
MessagesPublished prometheus.Counter
BytesPublished prometheus.Counter
MessagesPending prometheus.Gauge
@@ -89,155 +88,171 @@ type TopicMetrics struct {
ReadersClosed prometheus.Counter
}
-func NewMetricsProvider(userDefinedLabels map[string]string) *Metrics {
+func NewMetricsProvider(metricsCardinality int, userDefinedLabels
map[string]string) *Metrics {
constLabels := map[string]string{
"client": "go",
}
for k, v := range userDefinedLabels {
constLabels[k] = v
}
+ var metricsLevelLabels []string
+
+ // note: ints here mirror MetricsCardinality in client.go to avoid
import cycle
+ switch metricsCardinality {
+ case 1: //MetricsCardinalityNone
+ metricsLevelLabels = []string{}
+ case 2: //MetricsCardinalityTenant
+ metricsLevelLabels = []string{"pulsar_tenant"}
+ case 3: //MetricsCardinalityNamespace
+ metricsLevelLabels = []string{"pulsar_tenant",
"pulsar_namespace"}
+ case 4: //MetricsCardinalityTopic
+ metricsLevelLabels = []string{"pulsar_tenant",
"pulsar_namespace", "topic"}
+ default: //Anything else is namespace
+ metricsLevelLabels = []string{"pulsar_tenant",
"pulsar_namespace"}
+ }
metrics := &Metrics{
+ metricsLevel: metricsCardinality,
messagesPublished:
prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "pulsar_client_messages_published",
Help: "Counter of messages published by the
client",
ConstLabels: constLabels,
- }, topicLabelNames),
+ }, metricsLevelLabels),
bytesPublished: prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "pulsar_client_bytes_published",
Help: "Counter of messages published by the
client",
ConstLabels: constLabels,
- }, topicLabelNames),
+ }, metricsLevelLabels),
messagesPending: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "pulsar_client_producer_pending_messages",
Help: "Counter of messages pending to be
published by the client",
ConstLabels: constLabels,
- }, topicLabelNames),
+ }, metricsLevelLabels),
bytesPending: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "pulsar_client_producer_pending_bytes",
Help: "Counter of bytes pending to be published
by the client",
ConstLabels: constLabels,
- }, topicLabelNames),
+ }, metricsLevelLabels),
publishErrors: prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "pulsar_client_producer_errors",
Help: "Counter of publish errors",
ConstLabels: constLabels,
- }, append(topicLabelNames, "error")),
+ }, append(metricsLevelLabels, "error")),
publishLatency:
prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: "pulsar_client_producer_latency_seconds",
Help: "Publish latency experienced by the
client",
ConstLabels: constLabels,
Buckets: []float64{.0005, .001, .005, .01, .025,
.05, .1, .25, .5, 1, 2.5, 5, 10},
- }, topicLabelNames),
+ }, metricsLevelLabels),
publishRPCLatency:
prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name:
"pulsar_client_producer_rpc_latency_seconds",
Help: "Publish RPC latency experienced
internally by the client when sending data to receiving an ack",
ConstLabels: constLabels,
Buckets: []float64{.0005, .001, .005, .01, .025,
.05, .1, .25, .5, 1, 2.5, 5, 10},
- }, topicLabelNames),
+ }, metricsLevelLabels),
producersOpened:
prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "pulsar_client_producers_opened",
Help: "Counter of producers created by the
client",
ConstLabels: constLabels,
- }, topicLabelNames),
+ }, metricsLevelLabels),
producersClosed:
prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "pulsar_client_producers_closed",
Help: "Counter of producers closed by the
client",
ConstLabels: constLabels,
- }, topicLabelNames),
+ }, metricsLevelLabels),
producersPartitions:
prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name:
"pulsar_client_producers_partitions_active",
Help: "Counter of individual partitions the
producers are currently active",
ConstLabels: constLabels,
- }, topicLabelNames),
+ }, metricsLevelLabels),
consumersOpened:
prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "pulsar_client_consumers_opened",
Help: "Counter of consumers created by the
client",
ConstLabels: constLabels,
- }, topicLabelNames),
+ }, metricsLevelLabels),
consumersClosed:
prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "pulsar_client_consumers_closed",
Help: "Counter of consumers closed by the
client",
ConstLabels: constLabels,
- }, topicLabelNames),
+ }, metricsLevelLabels),
consumersPartitions:
prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name:
"pulsar_client_consumers_partitions_active",
Help: "Counter of individual partitions the
consumers are currently active",
ConstLabels: constLabels,
- }, topicLabelNames),
+ }, metricsLevelLabels),
messagesReceived:
prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "pulsar_client_messages_received",
Help: "Counter of messages received by the
client",
ConstLabels: constLabels,
- }, topicLabelNames),
+ }, metricsLevelLabels),
bytesReceived: prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "pulsar_client_bytes_received",
Help: "Counter of bytes received by the client",
ConstLabels: constLabels,
- }, topicLabelNames),
+ }, metricsLevelLabels),
prefetchedMessages: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name:
"pulsar_client_consumer_prefetched_messages",
Help: "Number of messages currently sitting in
the consumer pre-fetch queue",
ConstLabels: constLabels,
- }, topicLabelNames),
+ }, metricsLevelLabels),
prefetchedBytes: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "pulsar_client_consumer_prefetched_bytes",
Help: "Total number of bytes currently sitting
in the consumer pre-fetch queue",
ConstLabels: constLabels,
- }, topicLabelNames),
+ }, metricsLevelLabels),
acksCounter: prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "pulsar_client_consumer_acks",
Help: "Counter of messages acked by client",
ConstLabels: constLabels,
- }, topicLabelNames),
+ }, metricsLevelLabels),
nacksCounter: prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "pulsar_client_consumer_nacks",
Help: "Counter of messages nacked by client",
ConstLabels: constLabels,
- }, topicLabelNames),
+ }, metricsLevelLabels),
dlqCounter: prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "pulsar_client_consumer_dlq_messages",
Help: "Counter of messages sent to Dead letter
queue",
ConstLabels: constLabels,
- }, topicLabelNames),
+ }, metricsLevelLabels),
processingTime:
prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name:
"pulsar_client_consumer_processing_time_seconds",
Help: "Time it takes for application to process
messages",
Buckets: []float64{.0005, .001, .005, .01, .025,
.05, .1, .25, .5, 1, 2.5, 5, 10},
ConstLabels: constLabels,
- }, topicLabelNames),
+ }, metricsLevelLabels),
readersOpened: prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "pulsar_client_readers_opened",
Help: "Counter of readers created by the client",
ConstLabels: constLabels,
- }, topicLabelNames),
+ }, metricsLevelLabels),
readersClosed: prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "pulsar_client_readers_closed",
Help: "Counter of readers closed by the client",
ConstLabels: constLabels,
- }, topicLabelNames),
+ }, metricsLevelLabels),
ConnectionsOpened: prometheus.NewCounter(prometheus.CounterOpts{
Name: "pulsar_client_connections_opened",
@@ -465,16 +480,22 @@ func NewMetricsProvider(userDefinedLabels
map[string]string) *Metrics {
return metrics
}
-func (mp *Metrics) GetTopicMetrics(t string) *TopicMetrics {
+func (mp *Metrics) GetLeveledMetrics(t string) *LeveledMetrics {
+ labels := make(map[string]string, 3)
tn, _ := ParseTopicName(t)
topic := TopicNameWithoutPartitionPart(tn)
- labels := map[string]string{
- "pulsar_tenant": tn.Tenant,
- "pulsar_namespace": tn.Namespace,
- "topic": topic,
- }
-
- tm := &TopicMetrics{
+ switch mp.metricsLevel {
+ case 4:
+ labels["topic"] = topic
+ fallthrough
+ case 3:
+ labels["pulsar_namespace"] = tn.Namespace
+ fallthrough
+ case 2:
+ labels["pulsar_tenant"] = tn.Tenant
+ }
+
+ lm := &LeveledMetrics{
MessagesPublished: mp.messagesPublished.With(labels),
BytesPublished: mp.bytesPublished.With(labels),
MessagesPending: mp.messagesPending.With(labels),
@@ -503,7 +524,7 @@ func (mp *Metrics) GetTopicMetrics(t string) *TopicMetrics {
ReadersClosed: mp.readersClosed.With(labels),
}
- return tm
+ return lm
}
func mergeMaps(a, b map[string]string) map[string]string {
diff --git a/pulsar/producer_impl.go b/pulsar/producer_impl.go
index 20e8d3d..48e2aa4 100644
--- a/pulsar/producer_impl.go
+++ b/pulsar/producer_impl.go
@@ -57,7 +57,7 @@ type producer struct {
closeOnce sync.Once
stopDiscovery func()
log log.Logger
- metrics *internal.TopicMetrics
+ metrics *internal.LeveledMetrics
}
func getHashingFunction(s HashingScheme) func(string) uint32 {
@@ -97,7 +97,7 @@ func newProducer(client *client, options *ProducerOptions)
(*producer, error) {
topic: options.Topic,
client: client,
log: client.log.SubLogger(log.Fields{"topic":
options.Topic}),
- metrics: client.metrics.GetTopicMetrics(options.Topic),
+ metrics: client.metrics.GetLeveledMetrics(options.Topic),
}
if options.Interceptors == nil {
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index e273021..e1762f9 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -84,13 +84,13 @@ type partitionProducer struct {
lastSequenceID int64
schemaInfo *SchemaInfo
partitionIdx int32
- metrics *internal.TopicMetrics
+ metrics *internal.LeveledMetrics
epoch uint64
}
func newPartitionProducer(client *client, topic string, options
*ProducerOptions, partitionIdx int,
- metrics *internal.TopicMetrics) (
+ metrics *internal.LeveledMetrics) (
*partitionProducer, error) {
var batchingMaxPublishDelay time.Duration
if options.BatchingMaxPublishDelay != 0 {
diff --git a/pulsar/reader_impl.go b/pulsar/reader_impl.go
index 0e63f8b..c854325 100644
--- a/pulsar/reader_impl.go
+++ b/pulsar/reader_impl.go
@@ -38,7 +38,7 @@ type reader struct {
messageCh chan ConsumerMessage
lastMessageInBroker trackingMessageID
log log.Logger
- metrics *internal.TopicMetrics
+ metrics *internal.LeveledMetrics
}
func newReader(client *client, options ReaderOptions) (Reader, error) {
@@ -96,7 +96,7 @@ func newReader(client *client, options ReaderOptions)
(Reader, error) {
client: client,
messageCh: make(chan ConsumerMessage),
log: client.log.SubLogger(log.Fields{"topic":
options.Topic}),
- metrics: client.metrics.GetTopicMetrics(options.Topic),
+ metrics: client.metrics.GetLeveledMetrics(options.Topic),
}
// Provide dummy dlq router with not dlq policy