This is an automated email from the ASF dual-hosted git repository.

pbacsko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git


The following commit(s) were added to refs/heads/master by this push:
     new a4e3a341 [YUNIKORN-970] Add queue metrics with queue names as tags 
(#641)
a4e3a341 is described below

commit a4e3a341ad748e812adc8af1c44097405f7da9d1
Author: William Tom <[email protected]>
AuthorDate: Sun Nov 19 05:26:56 2023 +0100

    [YUNIKORN-970] Add queue metrics with queue names as tags (#641)
    
    Closes: #641
    
    Signed-off-by: Peter Bacsko <[email protected]>
---
 pkg/metrics/queue.go      | 100 +++++++++++++++++++++++++++++++++-------------
 pkg/metrics/queue_test.go |  69 +++++++++++++++++++++++++++++---
 2 files changed, 136 insertions(+), 33 deletions(-)

diff --git a/pkg/metrics/queue.go b/pkg/metrics/queue.go
index cb227cf4..08ff05ce 100644
--- a/pkg/metrics/queue.go
+++ b/pkg/metrics/queue.go
@@ -28,9 +28,13 @@ import (
 
 // QueueMetrics to declare queue metrics
 type QueueMetrics struct {
-       appMetrics       *prometheus.GaugeVec
-       containerMetrics *prometheus.CounterVec
-       ResourceMetrics  *prometheus.GaugeVec
+       appMetricsLabel *prometheus.GaugeVec
+       // Deprecated - To be removed in 1.7.0. Replaced with queue label 
Metrics
+       appMetricsSubsystem  *prometheus.GaugeVec
+       containerMetrics     *prometheus.CounterVec
+       resourceMetricsLabel *prometheus.GaugeVec
+       // Deprecated - To be removed in 1.7.0. Replaced with queue label 
Metrics
+       resourceMetricsSubsystem *prometheus.GaugeVec
 }
 
 // InitQueueMetrics to initialize queue metrics
@@ -39,7 +43,15 @@ func InitQueueMetrics(name string) CoreQueueMetrics {
 
        replaceStr := formatMetricName(name)
 
-       q.appMetrics = prometheus.NewGaugeVec(
+       q.appMetricsLabel = prometheus.NewGaugeVec(
+               prometheus.GaugeOpts{
+                       Namespace:   Namespace,
+                       Name:        "queue_app",
+                       ConstLabels: prometheus.Labels{"queue": name},
+                       Help:        "Queue application metrics. State of the 
application includes `running`.",
+               }, []string{"state"})
+
+       q.appMetricsSubsystem = prometheus.NewGaugeVec(
                prometheus.GaugeOpts{
                        Namespace: Namespace,
                        Subsystem: replaceStr,
@@ -55,7 +67,15 @@ func InitQueueMetrics(name string) CoreQueueMetrics {
                        Help:      "Queue container metrics. State of the 
attempt includes `allocated`, `released`.",
                }, []string{"state"})
 
-       q.ResourceMetrics = prometheus.NewGaugeVec(
+       q.resourceMetricsLabel = prometheus.NewGaugeVec(
+               prometheus.GaugeOpts{
+                       Namespace:   Namespace,
+                       Name:        "queue_resource",
+                       ConstLabels: prometheus.Labels{"queue": name},
+                       Help:        "Queue resource metrics. State of the 
resource includes `guaranteed`, `max`, `allocated`, `pending`, `preempting`.",
+               }, []string{"state", "resource"})
+
+       q.resourceMetricsSubsystem = prometheus.NewGaugeVec(
                prometheus.GaugeOpts{
                        Namespace: Namespace,
                        Subsystem: replaceStr,
@@ -64,9 +84,11 @@ func InitQueueMetrics(name string) CoreQueueMetrics {
                }, []string{"state", "resource"})
 
        var queueMetricsList = []prometheus.Collector{
-               q.appMetrics,
+               q.appMetricsLabel,
+               q.appMetricsSubsystem,
                q.containerMetrics,
-               q.ResourceMetrics,
+               q.resourceMetricsLabel,
+               q.resourceMetricsSubsystem,
        }
 
        // Register the metrics
@@ -82,22 +104,44 @@ func InitQueueMetrics(name string) CoreQueueMetrics {
        return q
 }
 
+func (m *QueueMetrics) incQueueApplications(state string) {
+       m.appMetricsLabel.With(prometheus.Labels{"state": state}).Inc()
+       m.appMetricsSubsystem.With(prometheus.Labels{"state": state}).Inc()
+}
+
+func (m *QueueMetrics) decQueueApplications(state string) {
+       m.appMetricsLabel.With(prometheus.Labels{"state": state}).Dec()
+       m.appMetricsSubsystem.With(prometheus.Labels{"state": state}).Dec()
+}
+
+func (m *QueueMetrics) addQueueResource(state string, resourceName string, 
value float64) {
+       m.resourceMetricsLabel.With(prometheus.Labels{"state": state, 
"resource": resourceName}).Add(value)
+       m.resourceMetricsSubsystem.With(prometheus.Labels{"state": state, 
"resource": resourceName}).Add(value)
+}
+
+func (m *QueueMetrics) setQueueResource(state string, resourceName string, 
value float64) {
+       m.resourceMetricsLabel.With(prometheus.Labels{"state": state, 
"resource": resourceName}).Set(value)
+       m.resourceMetricsSubsystem.With(prometheus.Labels{"state": state, 
"resource": resourceName}).Set(value)
+}
+
 func (m *QueueMetrics) Reset() {
-       m.appMetrics.Reset()
-       m.ResourceMetrics.Reset()
+       m.appMetricsLabel.Reset()
+       m.appMetricsSubsystem.Reset()
+       m.resourceMetricsLabel.Reset()
+       m.resourceMetricsSubsystem.Reset()
 }
 
 func (m *QueueMetrics) IncQueueApplicationsRunning() {
-       m.appMetrics.With(prometheus.Labels{"state": "running"}).Inc()
+       m.incQueueApplications("running")
 }
 
 func (m *QueueMetrics) DecQueueApplicationsRunning() {
-       m.appMetrics.With(prometheus.Labels{"state": "running"}).Dec()
+       m.decQueueApplications("running")
 }
 
 func (m *QueueMetrics) GetQueueApplicationsRunning() (int, error) {
        metricDto := &dto.Metric{}
-       err := m.appMetrics.With(prometheus.Labels{"state": 
"running"}).Write(metricDto)
+       err := m.appMetricsLabel.With(prometheus.Labels{"state": 
"running"}).Write(metricDto)
        if err == nil {
                return int(*metricDto.Gauge.Value), nil
        }
@@ -105,12 +149,12 @@ func (m *QueueMetrics) GetQueueApplicationsRunning() 
(int, error) {
 }
 
 func (m *QueueMetrics) IncQueueApplicationsAccepted() {
-       m.appMetrics.With(prometheus.Labels{"state": "accepted"}).Inc()
+       m.incQueueApplications("accepted")
 }
 
 func (m *QueueMetrics) GetQueueApplicationsAccepted() (int, error) {
        metricDto := &dto.Metric{}
-       err := m.appMetrics.With(prometheus.Labels{"state": 
"accepted"}).Write(metricDto)
+       err := m.appMetricsLabel.With(prometheus.Labels{"state": 
"accepted"}).Write(metricDto)
        if err == nil {
                return int(*metricDto.Gauge.Value), nil
        }
@@ -118,12 +162,12 @@ func (m *QueueMetrics) GetQueueApplicationsAccepted() 
(int, error) {
 }
 
 func (m *QueueMetrics) IncQueueApplicationsRejected() {
-       m.appMetrics.With(prometheus.Labels{"state": "rejected"}).Inc()
+       m.incQueueApplications("rejected")
 }
 
 func (m *QueueMetrics) GetQueueApplicationsRejected() (int, error) {
        metricDto := &dto.Metric{}
-       err := m.appMetrics.With(prometheus.Labels{"state": 
"rejected"}).Write(metricDto)
+       err := m.appMetricsLabel.With(prometheus.Labels{"state": 
"rejected"}).Write(metricDto)
        if err == nil {
                return int(*metricDto.Gauge.Value), nil
        }
@@ -131,12 +175,12 @@ func (m *QueueMetrics) GetQueueApplicationsRejected() 
(int, error) {
 }
 
 func (m *QueueMetrics) IncQueueApplicationsFailed() {
-       m.appMetrics.With(prometheus.Labels{"state": "failed"}).Inc()
+       m.incQueueApplications("failed")
 }
 
 func (m *QueueMetrics) GetQueueApplicationsFailed() (int, error) {
        metricDto := &dto.Metric{}
-       err := m.appMetrics.With(prometheus.Labels{"state": 
"failed"}).Write(metricDto)
+       err := m.appMetricsLabel.With(prometheus.Labels{"state": 
"failed"}).Write(metricDto)
        if err == nil {
                return int(*metricDto.Gauge.Value), nil
        }
@@ -144,12 +188,12 @@ func (m *QueueMetrics) GetQueueApplicationsFailed() (int, 
error) {
 }
 
 func (m *QueueMetrics) IncQueueApplicationsCompleted() {
-       m.appMetrics.With(prometheus.Labels{"state": "completed"}).Inc()
+       m.incQueueApplications("completed")
 }
 
 func (m *QueueMetrics) GetQueueApplicationsCompleted() (int, error) {
        metricDto := &dto.Metric{}
-       err := m.appMetrics.With(prometheus.Labels{"state": 
"completed"}).Write(metricDto)
+       err := m.appMetricsLabel.With(prometheus.Labels{"state": 
"completed"}).Write(metricDto)
        if err == nil {
                return int(*metricDto.Gauge.Value), nil
        }
@@ -169,33 +213,33 @@ func (m *QueueMetrics) AddReleasedContainers(value int) {
 }
 
 func (m *QueueMetrics) SetQueueGuaranteedResourceMetrics(resourceName string, 
value float64) {
-       m.ResourceMetrics.With(prometheus.Labels{"state": "guaranteed", 
"resource": resourceName}).Set(value)
+       m.setQueueResource("guaranteed", resourceName, value)
 }
 
 func (m *QueueMetrics) SetQueueMaxResourceMetrics(resourceName string, value 
float64) {
-       m.ResourceMetrics.With(prometheus.Labels{"state": "max", "resource": 
resourceName}).Set(value)
+       m.setQueueResource("max", resourceName, value)
 }
 
 func (m *QueueMetrics) SetQueueAllocatedResourceMetrics(resourceName string, 
value float64) {
-       m.ResourceMetrics.With(prometheus.Labels{"state": "allocated", 
"resource": resourceName}).Set(value)
+       m.setQueueResource("allocated", resourceName, value)
 }
 
 func (m *QueueMetrics) AddQueueAllocatedResourceMetrics(resourceName string, 
value float64) {
-       m.ResourceMetrics.With(prometheus.Labels{"state": "allocated", 
"resource": resourceName}).Add(value)
+       m.addQueueResource("allocated", resourceName, value)
 }
 
 func (m *QueueMetrics) SetQueuePendingResourceMetrics(resourceName string, 
value float64) {
-       m.ResourceMetrics.With(prometheus.Labels{"state": "pending", 
"resource": resourceName}).Set(value)
+       m.setQueueResource("pending", resourceName, value)
 }
 
 func (m *QueueMetrics) AddQueuePendingResourceMetrics(resourceName string, 
value float64) {
-       m.ResourceMetrics.With(prometheus.Labels{"state": "pending", 
"resource": resourceName}).Add(value)
+       m.addQueueResource("pending", resourceName, value)
 }
 
 func (m *QueueMetrics) SetQueuePreemptingResourceMetrics(resourceName string, 
value float64) {
-       m.ResourceMetrics.With(prometheus.Labels{"state": "preempting", 
"resource": resourceName}).Set(value)
+       m.setQueueResource("preempting", resourceName, value)
 }
 
 func (m *QueueMetrics) AddQueuePreemptingResourceMetrics(resourceName string, 
value float64) {
-       m.ResourceMetrics.With(prometheus.Labels{"state": "preempting", 
"resource": resourceName}).Add(value)
+       m.addQueueResource("preempting", resourceName, value)
 }
diff --git a/pkg/metrics/queue_test.go b/pkg/metrics/queue_test.go
index 5a3b01b4..419efc82 100644
--- a/pkg/metrics/queue_test.go
+++ b/pkg/metrics/queue_test.go
@@ -139,13 +139,30 @@ func getQueueMetrics() CoreQueueMetrics {
 }
 
 func verifyAppMetrics(t *testing.T, expectedState string) {
+       verifyAppMetricsLabel(t, expectedState)
+       verifyAppMetricsSubsystem(t, expectedState)
+}
+
+func verifyAppMetricsLabel(t *testing.T, expectedState string) {
+       checkFn := func(labels []*dto.LabelPair) {
+               assert.Equal(t, 2, len(labels))
+               assert.Equal(t, "queue", *labels[0].Name)
+               assert.Equal(t, "root.test", *labels[0].Value)
+               assert.Equal(t, "state", *labels[1].Name)
+               assert.Equal(t, expectedState, *labels[1].Value)
+       }
+
+       verifyMetricsLabel(t, checkFn)
+}
+
+func verifyAppMetricsSubsystem(t *testing.T, expectedState string) {
        checkFn := func(labels []*dto.LabelPair) {
                assert.Equal(t, 1, len(labels))
                assert.Equal(t, "state", *labels[0].Name)
                assert.Equal(t, expectedState, *labels[0].Value)
        }
 
-       verifyMetrics(t, checkFn)
+       verifyMetricsSubsytem(t, checkFn)
 }
 
 func verifyContainerMetrics(t *testing.T, expectedState string, value float64) 
{
@@ -171,6 +188,25 @@ func verifyContainerMetrics(t *testing.T, expectedState 
string, value float64) {
        assert.Assert(t, checked, "Failed to find metric")
 }
 func verifyResourceMetrics(t *testing.T, expectedState, expectedResource 
string) {
+       verifyResourceMetricsLabel(t, expectedState, expectedResource)
+       verifyResourceMetricsSubsystem(t, expectedState, expectedResource)
+}
+
+func verifyResourceMetricsLabel(t *testing.T, expectedState, expectedResource 
string) {
+       checkFn := func(labels []*dto.LabelPair) {
+               assert.Equal(t, 3, len(labels))
+               assert.Equal(t, "queue", *labels[0].Name)
+               assert.Equal(t, "root.test", *labels[0].Value)
+               assert.Equal(t, "resource", *labels[1].Name)
+               assert.Equal(t, expectedResource, *labels[1].Value)
+               assert.Equal(t, "state", *labels[2].Name)
+               assert.Equal(t, expectedState, *labels[2].Value)
+       }
+
+       verifyMetricsLabel(t, checkFn)
+}
+
+func verifyResourceMetricsSubsystem(t *testing.T, expectedState, 
expectedResource string) {
        checkFn := func(labels []*dto.LabelPair) {
                assert.Equal(t, 2, len(labels))
                assert.Equal(t, "resource", *labels[0].Name)
@@ -179,10 +215,31 @@ func verifyResourceMetrics(t *testing.T, expectedState, 
expectedResource string)
                assert.Equal(t, expectedState, *labels[1].Value)
        }
 
-       verifyMetrics(t, checkFn)
+       verifyMetricsSubsytem(t, checkFn)
+}
+
+func verifyMetricsLabel(t *testing.T, checkLabel func(label []*dto.LabelPair)) 
{
+       mfs, err := prometheus.DefaultGatherer.Gather()
+       assert.NilError(t, err)
+
+       var checked bool
+       for _, metric := range mfs {
+               if strings.Contains(metric.GetName(), "yunikorn_queue") {
+                       assert.Equal(t, 1, len(metric.Metric))
+                       assert.Equal(t, dto.MetricType_GAUGE, metric.GetType())
+                       m := metric.Metric[0]
+                       checkLabel(m.Label)
+                       assert.Assert(t, m.Gauge != nil)
+                       assert.Equal(t, float64(1), *m.Gauge.Value)
+                       checked = true
+                       break
+               }
+       }
+
+       assert.Assert(t, checked, "Failed to find metric")
 }
 
-func verifyMetrics(t *testing.T, checkLabel func(label []*dto.LabelPair)) {
+func verifyMetricsSubsytem(t *testing.T, checkLabel func(label 
[]*dto.LabelPair)) {
        mfs, err := prometheus.DefaultGatherer.Gather()
        assert.NilError(t, err)
 
@@ -209,7 +266,9 @@ func unregisterQueueMetrics(t *testing.T) {
                t.Fatalf("Type assertion failed, metrics is not QueueMetrics")
        }
 
-       prometheus.Unregister(qm.appMetrics)
+       prometheus.Unregister(qm.appMetricsLabel)
+       prometheus.Unregister(qm.appMetricsSubsystem)
        prometheus.Unregister(qm.containerMetrics)
-       prometheus.Unregister(qm.ResourceMetrics)
+       prometheus.Unregister(qm.resourceMetricsLabel)
+       prometheus.Unregister(qm.resourceMetricsSubsystem)
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to