This is an automated email from the ASF dual-hosted git repository.

ccondit pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git


The following commit(s) were added to refs/heads/master by this push:
     new 227a240c [YUNIKORN-2855] Handle nil properly in queue metrics (#960)
227a240c is described below

commit 227a240c3dba2a4a949e0723685f487b3702d185
Author: qzhu <[email protected]>
AuthorDate: Tue Sep 17 11:52:49 2024 -0500

    [YUNIKORN-2855] Handle nil properly in queue metrics (#960)
    
    Closes: #960
    
    Signed-off-by: Craig Condit <[email protected]>
---
 pkg/metrics/queue.go           | 29 ++++++++++++++++++++++++-----
 pkg/metrics/queue_test.go      | 19 +++++++++++++++++--
 pkg/scheduler/objects/queue.go | 14 ++++++++------
 3 files changed, 49 insertions(+), 13 deletions(-)

diff --git a/pkg/metrics/queue.go b/pkg/metrics/queue.go
index 1a2c3670..fc118164 100644
--- a/pkg/metrics/queue.go
+++ b/pkg/metrics/queue.go
@@ -23,6 +23,8 @@ import (
        dto "github.com/prometheus/client_model/go"
        "go.uber.org/zap"
 
+       "github.com/apache/yunikorn-core/pkg/common/resources"
+       "github.com/apache/yunikorn-core/pkg/locking"
        "github.com/apache/yunikorn-core/pkg/log"
 )
 
@@ -57,6 +59,9 @@ type QueueMetrics struct {
        resourceMetricsLabel *prometheus.GaugeVec
        // Deprecated - To be removed in 1.7.0. Replaced with queue label 
Metrics
        resourceMetricsSubsystem *prometheus.GaugeVec
+       // Track known resource types
+       knownResourceTypes map[string]struct{}
+       lock               locking.Mutex
 }
 
 // InitQueueMetrics to initialize queue metrics
@@ -123,6 +128,7 @@ func InitQueueMetrics(name string) *QueueMetrics {
                }
        }
 
+       q.knownResourceTypes = make(map[string]struct{})
        return q
 }
 
@@ -142,10 +148,13 @@ func (m *QueueMetrics) setQueueResource(state string, 
resourceName string, value
 }
 
 func (m *QueueMetrics) Reset() {
+       m.lock.Lock()
+       defer m.lock.Unlock()
        m.appMetricsLabel.Reset()
        m.appMetricsSubsystem.Reset()
        m.resourceMetricsLabel.Reset()
        m.resourceMetricsSubsystem.Reset()
+       m.knownResourceTypes = make(map[string]struct{})
 }
 
 func (m *QueueMetrics) IncQueueApplicationsRunning() {
@@ -301,12 +310,22 @@ func (m *QueueMetrics) AddReleasedContainers(value int) {
        
m.containerMetrics.WithLabelValues(ContainerReleased).Add(float64(value))
 }
 
-func (m *QueueMetrics) SetQueueGuaranteedResourceMetrics(resourceName string, 
value float64) {
-       m.setQueueResource(QueueGuaranteed, resourceName, value)
-}
+func (m *QueueMetrics) UpdateQueueResourceMetrics(state string, newResources 
map[string]resources.Quantity) {
+       m.lock.Lock()
+       defer m.lock.Unlock()
+       // Iterate over new resource types and set their values
+       for resourceName, value := range newResources {
+               m.setQueueResource(state, resourceName, float64(value))
+               // Add new resources to the known list
+               m.knownResourceTypes[resourceName] = struct{}{}
+       }
 
-func (m *QueueMetrics) SetQueueMaxResourceMetrics(resourceName string, value 
float64) {
-       m.setQueueResource(QueueMax, resourceName, value)
+       // Emit old resource types that are missing in the new collection with 
zero
+       for resourceName := range m.knownResourceTypes {
+               if _, exists := newResources[resourceName]; !exists {
+                       m.setQueueResource(state, resourceName, float64(0))
+               }
+       }
 }
 
 func (m *QueueMetrics) SetQueueAllocatedResourceMetrics(resourceName string, 
value float64) {
diff --git a/pkg/metrics/queue_test.go b/pkg/metrics/queue_test.go
index ea2376bd..9a967864 100644
--- a/pkg/metrics/queue_test.go
+++ b/pkg/metrics/queue_test.go
@@ -26,6 +26,8 @@ import (
 
        "github.com/prometheus/client_golang/prometheus"
        dto "github.com/prometheus/client_model/go"
+
+       "github.com/apache/yunikorn-core/pkg/common/resources"
 )
 
 var qm *QueueMetrics
@@ -196,16 +198,28 @@ func TestQueueGuaranteedResourceMetrics(t *testing.T) {
        qm = getQueueMetrics()
        defer unregisterQueueMetrics()
 
-       qm.SetQueueGuaranteedResourceMetrics("cpu", 1)
+       qm.UpdateQueueResourceMetrics("guaranteed", 
map[string]resources.Quantity{
+               "cpu": 1,
+       })
        verifyResourceMetrics(t, "guaranteed", "cpu")
+       assert.DeepEqual(t, qm.knownResourceTypes, map[string]struct{}{"cpu": 
{}})
+
+       qm.UpdateQueueResourceMetrics("guaranteed", 
map[string]resources.Quantity{"memory": 1})
+       assert.DeepEqual(t, qm.knownResourceTypes, map[string]struct{}{"cpu": 
{}, "memory": {}})
 }
 
 func TestQueueMaxResourceMetrics(t *testing.T) {
        qm = getQueueMetrics()
        defer unregisterQueueMetrics()
 
-       qm.SetQueueMaxResourceMetrics("cpu", 1)
+       qm.UpdateQueueResourceMetrics("max", map[string]resources.Quantity{
+               "cpu": 1,
+       })
        verifyResourceMetrics(t, "max", "cpu")
+       assert.DeepEqual(t, qm.knownResourceTypes, map[string]struct{}{"cpu": 
{}})
+
+       qm.UpdateQueueResourceMetrics("max", 
map[string]resources.Quantity{"memory": 1})
+       assert.DeepEqual(t, qm.knownResourceTypes, map[string]struct{}{"cpu": 
{}, "memory": {}})
 }
 
 func TestQueueAllocatedResourceMetrics(t *testing.T) {
@@ -364,4 +378,5 @@ func unregisterQueueMetrics() {
        prometheus.Unregister(qm.containerMetrics)
        prometheus.Unregister(qm.resourceMetricsLabel)
        prometheus.Unregister(qm.resourceMetricsSubsystem)
+       qm.knownResourceTypes = make(map[string]struct{})
 }
diff --git a/pkg/scheduler/objects/queue.go b/pkg/scheduler/objects/queue.go
index 7e21bc8f..6ac9dc16 100644
--- a/pkg/scheduler/objects/queue.go
+++ b/pkg/scheduler/objects/queue.go
@@ -1657,20 +1657,22 @@ func (sq *Queue) SupportTaskGroup() bool {
 
 // updateGuaranteedResourceMetrics updates guaranteed resource metrics.
 func (sq *Queue) updateGuaranteedResourceMetrics() {
+       queueMetrics := metrics.GetQueueMetrics(sq.QueuePath)
+       resourcesToUpdate := map[string]resources.Quantity{}
        if sq.guaranteedResource != nil {
-               for k, v := range sq.guaranteedResource.Resources {
-                       
metrics.GetQueueMetrics(sq.QueuePath).SetQueueGuaranteedResourceMetrics(k, 
float64(v))
-               }
+               resourcesToUpdate = sq.guaranteedResource.Resources
        }
+       queueMetrics.UpdateQueueResourceMetrics(metrics.QueueGuaranteed, 
resourcesToUpdate)
 }
 
 // updateMaxResourceMetrics updates max resource metrics.
 func (sq *Queue) updateMaxResourceMetrics() {
+       queueMetrics := metrics.GetQueueMetrics(sq.QueuePath)
+       resourcesToUpdate := map[string]resources.Quantity{}
        if sq.maxResource != nil {
-               for k, v := range sq.maxResource.Resources {
-                       
metrics.GetQueueMetrics(sq.QueuePath).SetQueueMaxResourceMetrics(k, float64(v))
-               }
+               resourcesToUpdate = sq.maxResource.Resources
        }
+       queueMetrics.UpdateQueueResourceMetrics(metrics.QueueMax, 
resourcesToUpdate)
 }
 
 // updateAllocatedResourceMetrics updates allocated resource metrics for all 
queue types.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to