This is an automated email from the ASF dual-hosted git repository.
pbacsko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git
The following commit(s) were added to refs/heads/master by this push:
new a4e3a341 [YUNIKORN-970] Add queue metrics with queue names as tags
(#641)
a4e3a341 is described below
commit a4e3a341ad748e812adc8af1c44097405f7da9d1
Author: William Tom <[email protected]>
AuthorDate: Sun Nov 19 05:26:56 2023 +0100
[YUNIKORN-970] Add queue metrics with queue names as tags (#641)
Closes: #641
Signed-off-by: Peter Bacsko <[email protected]>
---
pkg/metrics/queue.go | 100 +++++++++++++++++++++++++++++++++-------------
pkg/metrics/queue_test.go | 69 +++++++++++++++++++++++++++++---
2 files changed, 136 insertions(+), 33 deletions(-)
diff --git a/pkg/metrics/queue.go b/pkg/metrics/queue.go
index cb227cf4..08ff05ce 100644
--- a/pkg/metrics/queue.go
+++ b/pkg/metrics/queue.go
@@ -28,9 +28,13 @@ import (
// QueueMetrics to declare queue metrics
type QueueMetrics struct {
- appMetrics *prometheus.GaugeVec
- containerMetrics *prometheus.CounterVec
- ResourceMetrics *prometheus.GaugeVec
+ appMetricsLabel *prometheus.GaugeVec
+ // Deprecated - To be removed in 1.7.0. Replaced with queue label
Metrics
+ appMetricsSubsystem *prometheus.GaugeVec
+ containerMetrics *prometheus.CounterVec
+ resourceMetricsLabel *prometheus.GaugeVec
+ // Deprecated - To be removed in 1.7.0. Replaced with queue label
Metrics
+ resourceMetricsSubsystem *prometheus.GaugeVec
}
// InitQueueMetrics to initialize queue metrics
@@ -39,7 +43,15 @@ func InitQueueMetrics(name string) CoreQueueMetrics {
replaceStr := formatMetricName(name)
- q.appMetrics = prometheus.NewGaugeVec(
+ q.appMetricsLabel = prometheus.NewGaugeVec(
+ prometheus.GaugeOpts{
+ Namespace: Namespace,
+ Name: "queue_app",
+ ConstLabels: prometheus.Labels{"queue": name},
+ Help: "Queue application metrics. State of the
application includes `running`.",
+ }, []string{"state"})
+
+ q.appMetricsSubsystem = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: Namespace,
Subsystem: replaceStr,
@@ -55,7 +67,15 @@ func InitQueueMetrics(name string) CoreQueueMetrics {
Help: "Queue container metrics. State of the
attempt includes `allocated`, `released`.",
}, []string{"state"})
- q.ResourceMetrics = prometheus.NewGaugeVec(
+ q.resourceMetricsLabel = prometheus.NewGaugeVec(
+ prometheus.GaugeOpts{
+ Namespace: Namespace,
+ Name: "queue_resource",
+ ConstLabels: prometheus.Labels{"queue": name},
+ Help: "Queue resource metrics. State of the
resource includes `guaranteed`, `max`, `allocated`, `pending`, `preempting`.",
+ }, []string{"state", "resource"})
+
+ q.resourceMetricsSubsystem = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: Namespace,
Subsystem: replaceStr,
@@ -64,9 +84,11 @@ func InitQueueMetrics(name string) CoreQueueMetrics {
}, []string{"state", "resource"})
var queueMetricsList = []prometheus.Collector{
- q.appMetrics,
+ q.appMetricsLabel,
+ q.appMetricsSubsystem,
q.containerMetrics,
- q.ResourceMetrics,
+ q.resourceMetricsLabel,
+ q.resourceMetricsSubsystem,
}
// Register the metrics
@@ -82,22 +104,44 @@ func InitQueueMetrics(name string) CoreQueueMetrics {
return q
}
+func (m *QueueMetrics) incQueueApplications(state string) {
+ m.appMetricsLabel.With(prometheus.Labels{"state": state}).Inc()
+ m.appMetricsSubsystem.With(prometheus.Labels{"state": state}).Inc()
+}
+
+func (m *QueueMetrics) decQueueApplications(state string) {
+ m.appMetricsLabel.With(prometheus.Labels{"state": state}).Dec()
+ m.appMetricsSubsystem.With(prometheus.Labels{"state": state}).Dec()
+}
+
+func (m *QueueMetrics) addQueueResource(state string, resourceName string,
value float64) {
+ m.resourceMetricsLabel.With(prometheus.Labels{"state": state,
"resource": resourceName}).Add(value)
+ m.resourceMetricsSubsystem.With(prometheus.Labels{"state": state,
"resource": resourceName}).Add(value)
+}
+
+func (m *QueueMetrics) setQueueResource(state string, resourceName string,
value float64) {
+ m.resourceMetricsLabel.With(prometheus.Labels{"state": state,
"resource": resourceName}).Set(value)
+ m.resourceMetricsSubsystem.With(prometheus.Labels{"state": state,
"resource": resourceName}).Set(value)
+}
+
func (m *QueueMetrics) Reset() {
- m.appMetrics.Reset()
- m.ResourceMetrics.Reset()
+ m.appMetricsLabel.Reset()
+ m.appMetricsSubsystem.Reset()
+ m.resourceMetricsLabel.Reset()
+ m.resourceMetricsSubsystem.Reset()
}
func (m *QueueMetrics) IncQueueApplicationsRunning() {
- m.appMetrics.With(prometheus.Labels{"state": "running"}).Inc()
+ m.incQueueApplications("running")
}
func (m *QueueMetrics) DecQueueApplicationsRunning() {
- m.appMetrics.With(prometheus.Labels{"state": "running"}).Dec()
+ m.decQueueApplications("running")
}
func (m *QueueMetrics) GetQueueApplicationsRunning() (int, error) {
metricDto := &dto.Metric{}
- err := m.appMetrics.With(prometheus.Labels{"state":
"running"}).Write(metricDto)
+ err := m.appMetricsLabel.With(prometheus.Labels{"state":
"running"}).Write(metricDto)
if err == nil {
return int(*metricDto.Gauge.Value), nil
}
@@ -105,12 +149,12 @@ func (m *QueueMetrics) GetQueueApplicationsRunning()
(int, error) {
}
func (m *QueueMetrics) IncQueueApplicationsAccepted() {
- m.appMetrics.With(prometheus.Labels{"state": "accepted"}).Inc()
+ m.incQueueApplications("accepted")
}
func (m *QueueMetrics) GetQueueApplicationsAccepted() (int, error) {
metricDto := &dto.Metric{}
- err := m.appMetrics.With(prometheus.Labels{"state":
"accepted"}).Write(metricDto)
+ err := m.appMetricsLabel.With(prometheus.Labels{"state":
"accepted"}).Write(metricDto)
if err == nil {
return int(*metricDto.Gauge.Value), nil
}
@@ -118,12 +162,12 @@ func (m *QueueMetrics) GetQueueApplicationsAccepted()
(int, error) {
}
func (m *QueueMetrics) IncQueueApplicationsRejected() {
- m.appMetrics.With(prometheus.Labels{"state": "rejected"}).Inc()
+ m.incQueueApplications("rejected")
}
func (m *QueueMetrics) GetQueueApplicationsRejected() (int, error) {
metricDto := &dto.Metric{}
- err := m.appMetrics.With(prometheus.Labels{"state":
"rejected"}).Write(metricDto)
+ err := m.appMetricsLabel.With(prometheus.Labels{"state":
"rejected"}).Write(metricDto)
if err == nil {
return int(*metricDto.Gauge.Value), nil
}
@@ -131,12 +175,12 @@ func (m *QueueMetrics) GetQueueApplicationsRejected()
(int, error) {
}
func (m *QueueMetrics) IncQueueApplicationsFailed() {
- m.appMetrics.With(prometheus.Labels{"state": "failed"}).Inc()
+ m.incQueueApplications("failed")
}
func (m *QueueMetrics) GetQueueApplicationsFailed() (int, error) {
metricDto := &dto.Metric{}
- err := m.appMetrics.With(prometheus.Labels{"state":
"failed"}).Write(metricDto)
+ err := m.appMetricsLabel.With(prometheus.Labels{"state":
"failed"}).Write(metricDto)
if err == nil {
return int(*metricDto.Gauge.Value), nil
}
@@ -144,12 +188,12 @@ func (m *QueueMetrics) GetQueueApplicationsFailed() (int,
error) {
}
func (m *QueueMetrics) IncQueueApplicationsCompleted() {
- m.appMetrics.With(prometheus.Labels{"state": "completed"}).Inc()
+ m.incQueueApplications("completed")
}
func (m *QueueMetrics) GetQueueApplicationsCompleted() (int, error) {
metricDto := &dto.Metric{}
- err := m.appMetrics.With(prometheus.Labels{"state":
"completed"}).Write(metricDto)
+ err := m.appMetricsLabel.With(prometheus.Labels{"state":
"completed"}).Write(metricDto)
if err == nil {
return int(*metricDto.Gauge.Value), nil
}
@@ -169,33 +213,33 @@ func (m *QueueMetrics) AddReleasedContainers(value int) {
}
func (m *QueueMetrics) SetQueueGuaranteedResourceMetrics(resourceName string,
value float64) {
- m.ResourceMetrics.With(prometheus.Labels{"state": "guaranteed",
"resource": resourceName}).Set(value)
+ m.setQueueResource("guaranteed", resourceName, value)
}
func (m *QueueMetrics) SetQueueMaxResourceMetrics(resourceName string, value
float64) {
- m.ResourceMetrics.With(prometheus.Labels{"state": "max", "resource":
resourceName}).Set(value)
+ m.setQueueResource("max", resourceName, value)
}
func (m *QueueMetrics) SetQueueAllocatedResourceMetrics(resourceName string,
value float64) {
- m.ResourceMetrics.With(prometheus.Labels{"state": "allocated",
"resource": resourceName}).Set(value)
+ m.setQueueResource("allocated", resourceName, value)
}
func (m *QueueMetrics) AddQueueAllocatedResourceMetrics(resourceName string,
value float64) {
- m.ResourceMetrics.With(prometheus.Labels{"state": "allocated",
"resource": resourceName}).Add(value)
+ m.addQueueResource("allocated", resourceName, value)
}
func (m *QueueMetrics) SetQueuePendingResourceMetrics(resourceName string,
value float64) {
- m.ResourceMetrics.With(prometheus.Labels{"state": "pending",
"resource": resourceName}).Set(value)
+ m.setQueueResource("pending", resourceName, value)
}
func (m *QueueMetrics) AddQueuePendingResourceMetrics(resourceName string,
value float64) {
- m.ResourceMetrics.With(prometheus.Labels{"state": "pending",
"resource": resourceName}).Add(value)
+ m.addQueueResource("pending", resourceName, value)
}
func (m *QueueMetrics) SetQueuePreemptingResourceMetrics(resourceName string,
value float64) {
- m.ResourceMetrics.With(prometheus.Labels{"state": "preempting",
"resource": resourceName}).Set(value)
+ m.setQueueResource("preempting", resourceName, value)
}
func (m *QueueMetrics) AddQueuePreemptingResourceMetrics(resourceName string,
value float64) {
- m.ResourceMetrics.With(prometheus.Labels{"state": "preempting",
"resource": resourceName}).Add(value)
+ m.addQueueResource("preempting", resourceName, value)
}
diff --git a/pkg/metrics/queue_test.go b/pkg/metrics/queue_test.go
index 5a3b01b4..419efc82 100644
--- a/pkg/metrics/queue_test.go
+++ b/pkg/metrics/queue_test.go
@@ -139,13 +139,30 @@ func getQueueMetrics() CoreQueueMetrics {
}
func verifyAppMetrics(t *testing.T, expectedState string) {
+ verifyAppMetricsLabel(t, expectedState)
+ verifyAppMetricsSubsystem(t, expectedState)
+}
+
+func verifyAppMetricsLabel(t *testing.T, expectedState string) {
+ checkFn := func(labels []*dto.LabelPair) {
+ assert.Equal(t, 2, len(labels))
+ assert.Equal(t, "queue", *labels[0].Name)
+ assert.Equal(t, "root.test", *labels[0].Value)
+ assert.Equal(t, "state", *labels[1].Name)
+ assert.Equal(t, expectedState, *labels[1].Value)
+ }
+
+ verifyMetricsLabel(t, checkFn)
+}
+
+func verifyAppMetricsSubsystem(t *testing.T, expectedState string) {
checkFn := func(labels []*dto.LabelPair) {
assert.Equal(t, 1, len(labels))
assert.Equal(t, "state", *labels[0].Name)
assert.Equal(t, expectedState, *labels[0].Value)
}
- verifyMetrics(t, checkFn)
+ verifyMetricsSubsytem(t, checkFn)
}
func verifyContainerMetrics(t *testing.T, expectedState string, value float64)
{
@@ -171,6 +188,25 @@ func verifyContainerMetrics(t *testing.T, expectedState
string, value float64) {
assert.Assert(t, checked, "Failed to find metric")
}
func verifyResourceMetrics(t *testing.T, expectedState, expectedResource
string) {
+ verifyResourceMetricsLabel(t, expectedState, expectedResource)
+ verifyResourceMetricsSubsystem(t, expectedState, expectedResource)
+}
+
+func verifyResourceMetricsLabel(t *testing.T, expectedState, expectedResource
string) {
+ checkFn := func(labels []*dto.LabelPair) {
+ assert.Equal(t, 3, len(labels))
+ assert.Equal(t, "queue", *labels[0].Name)
+ assert.Equal(t, "root.test", *labels[0].Value)
+ assert.Equal(t, "resource", *labels[1].Name)
+ assert.Equal(t, expectedResource, *labels[1].Value)
+ assert.Equal(t, "state", *labels[2].Name)
+ assert.Equal(t, expectedState, *labels[2].Value)
+ }
+
+ verifyMetricsLabel(t, checkFn)
+}
+
+func verifyResourceMetricsSubsystem(t *testing.T, expectedState,
expectedResource string) {
checkFn := func(labels []*dto.LabelPair) {
assert.Equal(t, 2, len(labels))
assert.Equal(t, "resource", *labels[0].Name)
@@ -179,10 +215,31 @@ func verifyResourceMetrics(t *testing.T, expectedState,
expectedResource string)
assert.Equal(t, expectedState, *labels[1].Value)
}
- verifyMetrics(t, checkFn)
+ verifyMetricsSubsytem(t, checkFn)
+}
+
+func verifyMetricsLabel(t *testing.T, checkLabel func(label []*dto.LabelPair))
{
+ mfs, err := prometheus.DefaultGatherer.Gather()
+ assert.NilError(t, err)
+
+ var checked bool
+ for _, metric := range mfs {
+ if strings.Contains(metric.GetName(), "yunikorn_queue") {
+ assert.Equal(t, 1, len(metric.Metric))
+ assert.Equal(t, dto.MetricType_GAUGE, metric.GetType())
+ m := metric.Metric[0]
+ checkLabel(m.Label)
+ assert.Assert(t, m.Gauge != nil)
+ assert.Equal(t, float64(1), *m.Gauge.Value)
+ checked = true
+ break
+ }
+ }
+
+ assert.Assert(t, checked, "Failed to find metric")
}
-func verifyMetrics(t *testing.T, checkLabel func(label []*dto.LabelPair)) {
+func verifyMetricsSubsytem(t *testing.T, checkLabel func(label
[]*dto.LabelPair)) {
mfs, err := prometheus.DefaultGatherer.Gather()
assert.NilError(t, err)
@@ -209,7 +266,9 @@ func unregisterQueueMetrics(t *testing.T) {
t.Fatalf("Type assertion failed, metrics is not QueueMetrics")
}
- prometheus.Unregister(qm.appMetrics)
+ prometheus.Unregister(qm.appMetricsLabel)
+ prometheus.Unregister(qm.appMetricsSubsystem)
prometheus.Unregister(qm.containerMetrics)
- prometheus.Unregister(qm.ResourceMetrics)
+ prometheus.Unregister(qm.resourceMetricsLabel)
+ prometheus.Unregister(qm.resourceMetricsSubsystem)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]