This is an automated email from the ASF dual-hosted git repository.
ccondit pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git
The following commit(s) were added to refs/heads/master by this push:
new 227a240c [YUNIKORN-2855] Handle nil properly in queue metrics (#960)
227a240c is described below
commit 227a240c3dba2a4a949e0723685f487b3702d185
Author: qzhu <[email protected]>
AuthorDate: Tue Sep 17 11:52:49 2024 -0500
[YUNIKORN-2855] Handle nil properly in queue metrics (#960)
Closes: #960
Signed-off-by: Craig Condit <[email protected]>
---
pkg/metrics/queue.go | 29 ++++++++++++++++++++++++-----
pkg/metrics/queue_test.go | 19 +++++++++++++++++--
pkg/scheduler/objects/queue.go | 14 ++++++++------
3 files changed, 49 insertions(+), 13 deletions(-)
diff --git a/pkg/metrics/queue.go b/pkg/metrics/queue.go
index 1a2c3670..fc118164 100644
--- a/pkg/metrics/queue.go
+++ b/pkg/metrics/queue.go
@@ -23,6 +23,8 @@ import (
dto "github.com/prometheus/client_model/go"
"go.uber.org/zap"
+ "github.com/apache/yunikorn-core/pkg/common/resources"
+ "github.com/apache/yunikorn-core/pkg/locking"
"github.com/apache/yunikorn-core/pkg/log"
)
@@ -57,6 +59,9 @@ type QueueMetrics struct {
resourceMetricsLabel *prometheus.GaugeVec
// Deprecated - To be removed in 1.7.0. Replaced with queue label
Metrics
resourceMetricsSubsystem *prometheus.GaugeVec
+ // Track known resource types
+ knownResourceTypes map[string]struct{}
+ lock locking.Mutex
}
// InitQueueMetrics to initialize queue metrics
@@ -123,6 +128,7 @@ func InitQueueMetrics(name string) *QueueMetrics {
}
}
+ q.knownResourceTypes = make(map[string]struct{})
return q
}
@@ -142,10 +148,13 @@ func (m *QueueMetrics) setQueueResource(state string,
resourceName string, value
}
func (m *QueueMetrics) Reset() {
+ m.lock.Lock()
+ defer m.lock.Unlock()
m.appMetricsLabel.Reset()
m.appMetricsSubsystem.Reset()
m.resourceMetricsLabel.Reset()
m.resourceMetricsSubsystem.Reset()
+ m.knownResourceTypes = make(map[string]struct{})
}
func (m *QueueMetrics) IncQueueApplicationsRunning() {
@@ -301,12 +310,22 @@ func (m *QueueMetrics) AddReleasedContainers(value int) {
m.containerMetrics.WithLabelValues(ContainerReleased).Add(float64(value))
}
-func (m *QueueMetrics) SetQueueGuaranteedResourceMetrics(resourceName string,
value float64) {
- m.setQueueResource(QueueGuaranteed, resourceName, value)
-}
+func (m *QueueMetrics) UpdateQueueResourceMetrics(state string, newResources
map[string]resources.Quantity) {
+ m.lock.Lock()
+ defer m.lock.Unlock()
+ // Iterate over new resource types and set their values
+ for resourceName, value := range newResources {
+ m.setQueueResource(state, resourceName, float64(value))
+ // Add new resources to the known list
+ m.knownResourceTypes[resourceName] = struct{}{}
+ }
-func (m *QueueMetrics) SetQueueMaxResourceMetrics(resourceName string, value
float64) {
- m.setQueueResource(QueueMax, resourceName, value)
+ // Emit old resource types that are missing in the new collection with
zero
+ for resourceName := range m.knownResourceTypes {
+ if _, exists := newResources[resourceName]; !exists {
+ m.setQueueResource(state, resourceName, float64(0))
+ }
+ }
}
func (m *QueueMetrics) SetQueueAllocatedResourceMetrics(resourceName string,
value float64) {
diff --git a/pkg/metrics/queue_test.go b/pkg/metrics/queue_test.go
index ea2376bd..9a967864 100644
--- a/pkg/metrics/queue_test.go
+++ b/pkg/metrics/queue_test.go
@@ -26,6 +26,8 @@ import (
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
+
+ "github.com/apache/yunikorn-core/pkg/common/resources"
)
var qm *QueueMetrics
@@ -196,16 +198,28 @@ func TestQueueGuaranteedResourceMetrics(t *testing.T) {
qm = getQueueMetrics()
defer unregisterQueueMetrics()
- qm.SetQueueGuaranteedResourceMetrics("cpu", 1)
+ qm.UpdateQueueResourceMetrics("guaranteed",
map[string]resources.Quantity{
+ "cpu": 1,
+ })
verifyResourceMetrics(t, "guaranteed", "cpu")
+ assert.DeepEqual(t, qm.knownResourceTypes, map[string]struct{}{"cpu":
{}})
+
+ qm.UpdateQueueResourceMetrics("guaranteed",
map[string]resources.Quantity{"memory": 1})
+ assert.DeepEqual(t, qm.knownResourceTypes, map[string]struct{}{"cpu":
{}, "memory": {}})
}
func TestQueueMaxResourceMetrics(t *testing.T) {
qm = getQueueMetrics()
defer unregisterQueueMetrics()
- qm.SetQueueMaxResourceMetrics("cpu", 1)
+ qm.UpdateQueueResourceMetrics("max", map[string]resources.Quantity{
+ "cpu": 1,
+ })
verifyResourceMetrics(t, "max", "cpu")
+ assert.DeepEqual(t, qm.knownResourceTypes, map[string]struct{}{"cpu":
{}})
+
+ qm.UpdateQueueResourceMetrics("max",
map[string]resources.Quantity{"memory": 1})
+ assert.DeepEqual(t, qm.knownResourceTypes, map[string]struct{}{"cpu":
{}, "memory": {}})
}
func TestQueueAllocatedResourceMetrics(t *testing.T) {
@@ -364,4 +378,5 @@ func unregisterQueueMetrics() {
prometheus.Unregister(qm.containerMetrics)
prometheus.Unregister(qm.resourceMetricsLabel)
prometheus.Unregister(qm.resourceMetricsSubsystem)
+ qm.knownResourceTypes = make(map[string]struct{})
}
diff --git a/pkg/scheduler/objects/queue.go b/pkg/scheduler/objects/queue.go
index 7e21bc8f..6ac9dc16 100644
--- a/pkg/scheduler/objects/queue.go
+++ b/pkg/scheduler/objects/queue.go
@@ -1657,20 +1657,22 @@ func (sq *Queue) SupportTaskGroup() bool {
// updateGuaranteedResourceMetrics updates guaranteed resource metrics.
func (sq *Queue) updateGuaranteedResourceMetrics() {
+ queueMetrics := metrics.GetQueueMetrics(sq.QueuePath)
+ resourcesToUpdate := map[string]resources.Quantity{}
if sq.guaranteedResource != nil {
- for k, v := range sq.guaranteedResource.Resources {
-
metrics.GetQueueMetrics(sq.QueuePath).SetQueueGuaranteedResourceMetrics(k,
float64(v))
- }
+ resourcesToUpdate = sq.guaranteedResource.Resources
}
+ queueMetrics.UpdateQueueResourceMetrics(metrics.QueueGuaranteed,
resourcesToUpdate)
}
// updateMaxResourceMetrics updates max resource metrics.
func (sq *Queue) updateMaxResourceMetrics() {
+ queueMetrics := metrics.GetQueueMetrics(sq.QueuePath)
+ resourcesToUpdate := map[string]resources.Quantity{}
if sq.maxResource != nil {
- for k, v := range sq.maxResource.Resources {
-
metrics.GetQueueMetrics(sq.QueuePath).SetQueueMaxResourceMetrics(k, float64(v))
- }
+ resourcesToUpdate = sq.maxResource.Resources
}
+ queueMetrics.UpdateQueueResourceMetrics(metrics.QueueMax,
resourcesToUpdate)
}
// updateAllocatedResourceMetrics updates allocated resource metrics for all
queue types.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]