This is an automated email from the ASF dual-hosted git repository.
wilfreds pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git
The following commit(s) were added to refs/heads/master by this push:
new 5a9d1b54 [YUNIKORN-2031] Update pending resource metric on queues
(#673)
5a9d1b54 is described below
commit 5a9d1b542d4d3b26916b10502ba933b812a4245c
Author: Wei Huang <[email protected]>
AuthorDate: Wed Oct 25 09:29:43 2023 +1100
[YUNIKORN-2031] Update pending resource metric on queues (#673)
Pending resource metrics are not updated consistently and rely on
updates from other metrics to be kept correct. This can lead to
incorrect metrics being shown if no changes are made.
Closes: #673
Signed-off-by: Wilfred Spiegelenburg <[email protected]>
---
go.mod | 1 +
pkg/scheduler/objects/application.go | 2 +-
pkg/scheduler/objects/queue.go | 27 ++++++++++++++++---
pkg/scheduler/objects/queue_test.go | 52 ++++++++++++++++++++++++++++++++++++
4 files changed, 77 insertions(+), 5 deletions(-)
diff --git a/go.mod b/go.mod
index b74bf299..e639aade 100644
--- a/go.mod
+++ b/go.mod
@@ -45,6 +45,7 @@ require (
github.com/HdrHistogram/hdrhistogram-go v1.0.1 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
+ github.com/davecgh/go-spew v1.1.1 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e //
indirect
diff --git a/pkg/scheduler/objects/application.go
b/pkg/scheduler/objects/application.go
index 84f8f987..57c1165d 100644
--- a/pkg/scheduler/objects/application.go
+++ b/pkg/scheduler/objects/application.go
@@ -376,7 +376,7 @@ func (sa *Application) timeoutStateTimer(expectedState
string, event application
sa.notifyRMAllocationReleased(sa.rmID,
toRelease, si.TerminationType_TIMEOUT, "releasing placeholders on app complete")
sa.clearStateTimer()
} else {
- //nolint: errcheck
+ // nolint: errcheck
_ = sa.HandleApplicationEvent(event)
}
}
diff --git a/pkg/scheduler/objects/queue.go b/pkg/scheduler/objects/queue.go
index 04d97376..1b76d63b 100644
--- a/pkg/scheduler/objects/queue.go
+++ b/pkg/scheduler/objects/queue.go
@@ -651,6 +651,7 @@ func (sq *Queue) incPendingResource(delta
*resources.Resource) {
sq.Lock()
defer sq.Unlock()
sq.pending = resources.Add(sq.pending, delta)
+ sq.updatePendingResourceMetrics()
}
// decPendingResource decrements pending resource of this queue and its
parents.
@@ -671,6 +672,8 @@ func (sq *Queue) decPendingResource(delta
*resources.Resource) {
log.Log(log.SchedQueue).Warn("Pending resources went negative",
zap.String("queueName", sq.QueuePath),
zap.Error(err))
+ } else {
+ sq.updatePendingResourceMetrics()
}
}
@@ -1017,7 +1020,7 @@ func (sq *Queue) IncAllocatedResource(alloc
*resources.Resource, nodeReported bo
}
// all OK update this queue
sq.allocatedResource = newAllocated
- sq.updateAllocatedAndPendingResourceMetrics()
+ sq.updateAllocatedResourceMetrics()
return nil
}
@@ -1053,7 +1056,7 @@ func (sq *Queue) DecAllocatedResource(alloc
*resources.Resource) error {
}
// all OK update the queue
sq.allocatedResource = resources.Sub(sq.allocatedResource, alloc)
- sq.updateAllocatedAndPendingResourceMetrics()
+ sq.updateAllocatedResourceMetrics()
return nil
}
@@ -1066,7 +1069,7 @@ func (sq *Queue) IncPreemptingResource(alloc
*resources.Resource) {
defer sq.Unlock()
sq.parent.IncPreemptingResource(alloc)
sq.preemptingResource = resources.Add(sq.preemptingResource, alloc)
- sq.updateAllocatedAndPendingResourceMetrics()
+ sq.updatePreemptingResourceMetrics()
}
// DecPreemptingResource decrements the preempting resources for this queue
(recursively).
@@ -1078,7 +1081,7 @@ func (sq *Queue) DecPreemptingResource(alloc
*resources.Resource) {
defer sq.Unlock()
sq.parent.DecPreemptingResource(alloc)
sq.preemptingResource = resources.Sub(sq.preemptingResource, alloc)
- sq.updateAllocatedAndPendingResourceMetrics()
+ sq.updatePreemptingResourceMetrics()
}
func (sq *Queue) IsPrioritySortEnabled() bool {
@@ -1553,13 +1556,29 @@ func (sq *Queue) updateMaxResourceMetrics() {
}
// updateAllocatedAndPendingResourceMetrics updates allocated and pending
resource metrics for all queue types.
+// Deprecated: use specific metric update function for efficiency.
func (sq *Queue) updateAllocatedAndPendingResourceMetrics() {
+ sq.updateAllocatedResourceMetrics()
+ sq.updatePendingResourceMetrics()
+ sq.updatePreemptingResourceMetrics()
+}
+
+// updateAllocatedResourceMetrics updates allocated resource metrics for all
queue types.
+func (sq *Queue) updateAllocatedResourceMetrics() {
for k, v := range sq.allocatedResource.Resources {
metrics.GetQueueMetrics(sq.QueuePath).SetQueueAllocatedResourceMetrics(k,
float64(v))
}
+}
+
+// updatePendingResourceMetrics updates pending resource metrics for all queue
types.
+func (sq *Queue) updatePendingResourceMetrics() {
for k, v := range sq.pending.Resources {
metrics.GetQueueMetrics(sq.QueuePath).SetQueuePendingResourceMetrics(k,
float64(v))
}
+}
+
+// updatePendingResourceMetrics updates preempting resource metrics for all
queue types.
+func (sq *Queue) updatePreemptingResourceMetrics() {
for k, v := range sq.preemptingResource.Resources {
metrics.GetQueueMetrics(sq.QueuePath).SetQueuePreemptingResourceMetrics(k,
float64(v))
}
diff --git a/pkg/scheduler/objects/queue_test.go
b/pkg/scheduler/objects/queue_test.go
index f6f3003b..909c86b1 100644
--- a/pkg/scheduler/objects/queue_test.go
+++ b/pkg/scheduler/objects/queue_test.go
@@ -23,15 +23,19 @@ import (
"reflect"
"sort"
"strconv"
+ "strings"
"testing"
"time"
+ "github.com/prometheus/client_golang/prometheus"
+ promtu "github.com/prometheus/client_golang/prometheus/testutil"
"gotest.tools/v3/assert"
"github.com/apache/yunikorn-core/pkg/common"
"github.com/apache/yunikorn-core/pkg/common/configs"
"github.com/apache/yunikorn-core/pkg/common/resources"
"github.com/apache/yunikorn-core/pkg/events"
+ "github.com/apache/yunikorn-core/pkg/metrics"
"github.com/apache/yunikorn-core/pkg/scheduler/objects/template"
"github.com/apache/yunikorn-core/pkg/scheduler/policies"
siCommon "github.com/apache/yunikorn-scheduler-interface/lib/go/common"
@@ -266,6 +270,11 @@ func TestPriorityCalc(t *testing.T) {
}
func TestPendingCalc(t *testing.T) {
+ // Reset existing metric storage; otherwise this unit test would get
metrics populated by other UTs.
+ // In long run, to make the metrics code more testable, we should pass
instantiable Metrics obj to Queue
+ // instead of using a global Metrics obj at pkg/metrics/init.go.
+ metrics.Reset()
+
// create the root
root, err := createRootQueue(nil)
assert.NilError(t, err, "queue create failed")
@@ -284,6 +293,16 @@ func TestPendingCalc(t *testing.T) {
if !resources.Equals(leaf.pending, allocRes) {
t.Errorf("leaf queue pending allocation failed to increment
expected %v, got %v", allocRes, leaf.pending)
}
+ metrics := []string{"yunikorn_root_queue_resource",
"yunikorn_root_leaf_queue_resource"}
+ want := concatQueueResourceMetric(metrics, []string{`
+yunikorn_root_queue_resource{resource="memory",state="pending"} 100
+yunikorn_root_queue_resource{resource="vcores",state="pending"} 10
+`, `
+yunikorn_root_leaf_queue_resource{resource="memory",state="pending"} 100
+yunikorn_root_leaf_queue_resource{resource="vcores",state="pending"} 10
+`},
+ )
+ assert.NilError(t, promtu.GatherAndCompare(prometheus.DefaultGatherer,
strings.NewReader(want), metrics...), "unexpected metrics")
leaf.decPendingResource(allocRes)
if !resources.IsZero(root.pending) {
t.Errorf("root queue pending allocation failed to decrement
expected 0, got %v", root.pending)
@@ -291,6 +310,15 @@ func TestPendingCalc(t *testing.T) {
if !resources.IsZero(leaf.pending) {
t.Errorf("leaf queue pending allocation failed to decrement
expected 0, got %v", leaf.pending)
}
+ want = concatQueueResourceMetric(metrics, []string{`
+yunikorn_root_queue_resource{resource="memory",state="pending"} 0
+yunikorn_root_queue_resource{resource="vcores",state="pending"} 0
+`, `
+yunikorn_root_leaf_queue_resource{resource="memory",state="pending"} 0
+yunikorn_root_leaf_queue_resource{resource="vcores",state="pending"} 0
+`},
+ )
+ assert.NilError(t, promtu.GatherAndCompare(prometheus.DefaultGatherer,
strings.NewReader(want), metrics...), "unexpected metrics")
// Not allowed to go negative: both will be zero after this
newRes := resources.Multiply(allocRes, 2)
root.pending = newRes
@@ -302,6 +330,30 @@ func TestPendingCalc(t *testing.T) {
if !resources.IsZero(leaf.GetPendingResource()) {
t.Errorf("leaf queue pending allocation should have failed to
decrement expected zero, got %v", leaf.pending)
}
+ want = concatQueueResourceMetric(metrics, []string{`
+yunikorn_root_queue_resource{resource="memory",state="pending"} 0
+yunikorn_root_queue_resource{resource="vcores",state="pending"} 0
+`, `
+yunikorn_root_leaf_queue_resource{resource="memory",state="pending"} 0
+yunikorn_root_leaf_queue_resource{resource="vcores",state="pending"} 0
+`},
+ )
+ assert.NilError(t, promtu.GatherAndCompare(prometheus.DefaultGatherer,
strings.NewReader(want), metrics...), "unexpected metrics")
+}
+
+const (
+ QueueResourceMetricHelp = "# HELP %v Queue resource metrics. State of
the resource includes `guaranteed`, `max`, `allocated`, `pending`,
`preempting`."
+ QueueResourceMetricType = "# TYPE %v gauge"
+)
+
+func concatQueueResourceMetric(metricNames, metricVals []string) string {
+ var out string
+ for i, metricName := range metricNames {
+ out = out + fmt.Sprintf(QueueResourceMetricHelp, metricName) +
"\n"
+ out = out + fmt.Sprintf(QueueResourceMetricType, metricName) +
"\n"
+ out += strings.TrimLeft(metricVals[i], "\n")
+ }
+ return out
}
func TestGetChildQueueInfo(t *testing.T) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]