This is an automated email from the ASF dual-hosted git repository.

wilfreds pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git


The following commit(s) were added to refs/heads/master by this push:
     new 5a9d1b54 [YUNIKORN-2031] Update pending resource metric on queues 
(#673)
5a9d1b54 is described below

commit 5a9d1b542d4d3b26916b10502ba933b812a4245c
Author: Wei Huang <[email protected]>
AuthorDate: Wed Oct 25 09:29:43 2023 +1100

    [YUNIKORN-2031] Update pending resource metric on queues (#673)
    
    Pending resource metrics are not updated consistently and rely on
    updates from other metrics to be kept correct. This can lead to
    incorrect metrics being shown if no changes are made.
    
    Closes: #673
    
    Signed-off-by: Wilfred Spiegelenburg <[email protected]>
---
 go.mod                               |  1 +
 pkg/scheduler/objects/application.go |  2 +-
 pkg/scheduler/objects/queue.go       | 27 ++++++++++++++++---
 pkg/scheduler/objects/queue_test.go  | 52 ++++++++++++++++++++++++++++++++++++
 4 files changed, 77 insertions(+), 5 deletions(-)

diff --git a/go.mod b/go.mod
index b74bf299..e639aade 100644
--- a/go.mod
+++ b/go.mod
@@ -45,6 +45,7 @@ require (
        github.com/HdrHistogram/hdrhistogram-go v1.0.1 // indirect
        github.com/beorn7/perks v1.0.1 // indirect
        github.com/cespare/xxhash/v2 v2.2.0 // indirect
+       github.com/davecgh/go-spew v1.1.1 // indirect
        github.com/golang/protobuf v1.5.3 // indirect
        github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
        github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // 
indirect
diff --git a/pkg/scheduler/objects/application.go 
b/pkg/scheduler/objects/application.go
index 84f8f987..57c1165d 100644
--- a/pkg/scheduler/objects/application.go
+++ b/pkg/scheduler/objects/application.go
@@ -376,7 +376,7 @@ func (sa *Application) timeoutStateTimer(expectedState 
string, event application
                                sa.notifyRMAllocationReleased(sa.rmID, 
toRelease, si.TerminationType_TIMEOUT, "releasing placeholders on app complete")
                                sa.clearStateTimer()
                        } else {
-                               //nolint: errcheck
+                               // nolint: errcheck
                                _ = sa.HandleApplicationEvent(event)
                        }
                }
diff --git a/pkg/scheduler/objects/queue.go b/pkg/scheduler/objects/queue.go
index 04d97376..1b76d63b 100644
--- a/pkg/scheduler/objects/queue.go
+++ b/pkg/scheduler/objects/queue.go
@@ -651,6 +651,7 @@ func (sq *Queue) incPendingResource(delta 
*resources.Resource) {
        sq.Lock()
        defer sq.Unlock()
        sq.pending = resources.Add(sq.pending, delta)
+       sq.updatePendingResourceMetrics()
 }
 
 // decPendingResource decrements pending resource of this queue and its 
parents.
@@ -671,6 +672,8 @@ func (sq *Queue) decPendingResource(delta 
*resources.Resource) {
                log.Log(log.SchedQueue).Warn("Pending resources went negative",
                        zap.String("queueName", sq.QueuePath),
                        zap.Error(err))
+       } else {
+               sq.updatePendingResourceMetrics()
        }
 }
 
@@ -1017,7 +1020,7 @@ func (sq *Queue) IncAllocatedResource(alloc 
*resources.Resource, nodeReported bo
        }
        // all OK update this queue
        sq.allocatedResource = newAllocated
-       sq.updateAllocatedAndPendingResourceMetrics()
+       sq.updateAllocatedResourceMetrics()
        return nil
 }
 
@@ -1053,7 +1056,7 @@ func (sq *Queue) DecAllocatedResource(alloc 
*resources.Resource) error {
        }
        // all OK update the queue
        sq.allocatedResource = resources.Sub(sq.allocatedResource, alloc)
-       sq.updateAllocatedAndPendingResourceMetrics()
+       sq.updateAllocatedResourceMetrics()
        return nil
 }
 
@@ -1066,7 +1069,7 @@ func (sq *Queue) IncPreemptingResource(alloc 
*resources.Resource) {
        defer sq.Unlock()
        sq.parent.IncPreemptingResource(alloc)
        sq.preemptingResource = resources.Add(sq.preemptingResource, alloc)
-       sq.updateAllocatedAndPendingResourceMetrics()
+       sq.updatePreemptingResourceMetrics()
 }
 
 // DecPreemptingResource decrements the preempting resources for this queue 
(recursively).
@@ -1078,7 +1081,7 @@ func (sq *Queue) DecPreemptingResource(alloc 
*resources.Resource) {
        defer sq.Unlock()
        sq.parent.DecPreemptingResource(alloc)
        sq.preemptingResource = resources.Sub(sq.preemptingResource, alloc)
-       sq.updateAllocatedAndPendingResourceMetrics()
+       sq.updatePreemptingResourceMetrics()
 }
 
 func (sq *Queue) IsPrioritySortEnabled() bool {
@@ -1553,13 +1556,29 @@ func (sq *Queue) updateMaxResourceMetrics() {
 }
 
 // updateAllocatedAndPendingResourceMetrics updates allocated and pending 
resource metrics for all queue types.
+// Deprecated: use specific metric update function for efficiency.
 func (sq *Queue) updateAllocatedAndPendingResourceMetrics() {
+       sq.updateAllocatedResourceMetrics()
+       sq.updatePendingResourceMetrics()
+       sq.updatePreemptingResourceMetrics()
+}
+
+// updateAllocatedResourceMetrics updates allocated resource metrics for all 
queue types.
+func (sq *Queue) updateAllocatedResourceMetrics() {
        for k, v := range sq.allocatedResource.Resources {
                
metrics.GetQueueMetrics(sq.QueuePath).SetQueueAllocatedResourceMetrics(k, 
float64(v))
        }
+}
+
+// updatePendingResourceMetrics updates pending resource metrics for all queue 
types.
+func (sq *Queue) updatePendingResourceMetrics() {
        for k, v := range sq.pending.Resources {
                
metrics.GetQueueMetrics(sq.QueuePath).SetQueuePendingResourceMetrics(k, 
float64(v))
        }
+}
+
+// updatePendingResourceMetrics updates preempting resource metrics for all 
queue types.
+func (sq *Queue) updatePreemptingResourceMetrics() {
        for k, v := range sq.preemptingResource.Resources {
                
metrics.GetQueueMetrics(sq.QueuePath).SetQueuePreemptingResourceMetrics(k, 
float64(v))
        }
diff --git a/pkg/scheduler/objects/queue_test.go 
b/pkg/scheduler/objects/queue_test.go
index f6f3003b..909c86b1 100644
--- a/pkg/scheduler/objects/queue_test.go
+++ b/pkg/scheduler/objects/queue_test.go
@@ -23,15 +23,19 @@ import (
        "reflect"
        "sort"
        "strconv"
+       "strings"
        "testing"
        "time"
 
+       "github.com/prometheus/client_golang/prometheus"
+       promtu "github.com/prometheus/client_golang/prometheus/testutil"
        "gotest.tools/v3/assert"
 
        "github.com/apache/yunikorn-core/pkg/common"
        "github.com/apache/yunikorn-core/pkg/common/configs"
        "github.com/apache/yunikorn-core/pkg/common/resources"
        "github.com/apache/yunikorn-core/pkg/events"
+       "github.com/apache/yunikorn-core/pkg/metrics"
        "github.com/apache/yunikorn-core/pkg/scheduler/objects/template"
        "github.com/apache/yunikorn-core/pkg/scheduler/policies"
        siCommon "github.com/apache/yunikorn-scheduler-interface/lib/go/common"
@@ -266,6 +270,11 @@ func TestPriorityCalc(t *testing.T) {
 }
 
 func TestPendingCalc(t *testing.T) {
+       // Reset existing metric storage; otherwise this unit test would get 
metrics populated by other UTs.
+       // In long run, to make the metrics code more testable, we should pass 
instantiable Metrics obj to Queue
+       // instead of using a global Metrics obj at pkg/metrics/init.go.
+       metrics.Reset()
+
        // create the root
        root, err := createRootQueue(nil)
        assert.NilError(t, err, "queue create failed")
@@ -284,6 +293,16 @@ func TestPendingCalc(t *testing.T) {
        if !resources.Equals(leaf.pending, allocRes) {
                t.Errorf("leaf queue pending allocation failed to increment 
expected %v, got %v", allocRes, leaf.pending)
        }
+       metrics := []string{"yunikorn_root_queue_resource", 
"yunikorn_root_leaf_queue_resource"}
+       want := concatQueueResourceMetric(metrics, []string{`
+yunikorn_root_queue_resource{resource="memory",state="pending"} 100
+yunikorn_root_queue_resource{resource="vcores",state="pending"} 10
+`, `
+yunikorn_root_leaf_queue_resource{resource="memory",state="pending"} 100
+yunikorn_root_leaf_queue_resource{resource="vcores",state="pending"} 10
+`},
+       )
+       assert.NilError(t, promtu.GatherAndCompare(prometheus.DefaultGatherer, 
strings.NewReader(want), metrics...), "unexpected metrics")
        leaf.decPendingResource(allocRes)
        if !resources.IsZero(root.pending) {
                t.Errorf("root queue pending allocation failed to decrement 
expected 0, got %v", root.pending)
@@ -291,6 +310,15 @@ func TestPendingCalc(t *testing.T) {
        if !resources.IsZero(leaf.pending) {
                t.Errorf("leaf queue pending allocation failed to decrement 
expected 0, got %v", leaf.pending)
        }
+       want = concatQueueResourceMetric(metrics, []string{`
+yunikorn_root_queue_resource{resource="memory",state="pending"} 0
+yunikorn_root_queue_resource{resource="vcores",state="pending"} 0
+`, `
+yunikorn_root_leaf_queue_resource{resource="memory",state="pending"} 0
+yunikorn_root_leaf_queue_resource{resource="vcores",state="pending"} 0
+`},
+       )
+       assert.NilError(t, promtu.GatherAndCompare(prometheus.DefaultGatherer, 
strings.NewReader(want), metrics...), "unexpected metrics")
        // Not allowed to go negative: both will be zero after this
        newRes := resources.Multiply(allocRes, 2)
        root.pending = newRes
@@ -302,6 +330,30 @@ func TestPendingCalc(t *testing.T) {
        if !resources.IsZero(leaf.GetPendingResource()) {
                t.Errorf("leaf queue pending allocation should have failed to 
decrement expected zero, got %v", leaf.pending)
        }
+       want = concatQueueResourceMetric(metrics, []string{`
+yunikorn_root_queue_resource{resource="memory",state="pending"} 0
+yunikorn_root_queue_resource{resource="vcores",state="pending"} 0
+`, `
+yunikorn_root_leaf_queue_resource{resource="memory",state="pending"} 0
+yunikorn_root_leaf_queue_resource{resource="vcores",state="pending"} 0
+`},
+       )
+       assert.NilError(t, promtu.GatherAndCompare(prometheus.DefaultGatherer, 
strings.NewReader(want), metrics...), "unexpected metrics")
+}
+
+const (
+       QueueResourceMetricHelp = "# HELP %v Queue resource metrics. State of 
the resource includes `guaranteed`, `max`, `allocated`, `pending`, 
`preempting`."
+       QueueResourceMetricType = "# TYPE %v gauge"
+)
+
+func concatQueueResourceMetric(metricNames, metricVals []string) string {
+       var out string
+       for i, metricName := range metricNames {
+               out = out + fmt.Sprintf(QueueResourceMetricHelp, metricName) + 
"\n"
+               out = out + fmt.Sprintf(QueueResourceMetricType, metricName) + 
"\n"
+               out += strings.TrimLeft(metricVals[i], "\n")
+       }
+       return out
 }
 
 func TestGetChildQueueInfo(t *testing.T) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to