This is an automated email from the ASF dual-hosted git repository.
ccondit pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git
The following commit(s) were added to refs/heads/master by this push:
new 00744231 [YUNIKORN-1951] Ensure that queues with no guaranteed
resources do not trigger preemption (#639)
00744231 is described below
commit 00744231e6778171463314931a0ca54dd4e060d4
Author: Craig Condit <[email protected]>
AuthorDate: Mon Sep 11 09:11:29 2023 -0500
[YUNIKORN-1951] Ensure that queues with no guaranteed resources do not
trigger preemption (#639)
Closes: #639
---
pkg/common/resources/resources.go | 5 ++++
pkg/common/resources/resources_test.go | 18 ++++++++++++
pkg/scheduler/objects/preemption.go | 7 +++++
pkg/scheduler/objects/preemption_test.go | 48 ++++++++++++++++++++++++++++++++
pkg/scheduler/objects/queue.go | 1 +
5 files changed, 79 insertions(+)
diff --git a/pkg/common/resources/resources.go
b/pkg/common/resources/resources.go
index dc061e65..2aee8c04 100644
--- a/pkg/common/resources/resources.go
+++ b/pkg/common/resources/resources.go
@@ -907,6 +907,11 @@ func (r *Resource) HasNegativeValue() bool {
return false
}
+// IsEmpty returns true if the resource is nil or has no component resources
specified.
+func (r *Resource) IsEmpty() bool {
+ return r == nil || len(r.Resources) == 0
+}
+
// Returns a new resource with the largest value for each quantity in the
resources
// If either resource passed in is nil a zero resource is returned
func ComponentWiseMax(left, right *Resource) *Resource {
diff --git a/pkg/common/resources/resources_test.go
b/pkg/common/resources/resources_test.go
index 0bb27229..8460017c 100644
--- a/pkg/common/resources/resources_test.go
+++ b/pkg/common/resources/resources_test.go
@@ -1683,3 +1683,21 @@ func TestHasNegativeValue(t *testing.T) {
})
}
}
+
+func TestIsEmpty(t *testing.T) {
+ testCases := []struct {
+ name string
+ input *Resource
+ expectedResult bool
+ }{
+ {"Nil resource", nil, true},
+ {"Empty resource", NewResource(), true},
+ {"Positive value",
NewResourceFromMap(map[string]Quantity{common.Memory: 100}), false},
+ {"Negative value",
NewResourceFromMap(map[string]Quantity{common.Memory: -100}), false},
+ }
+ for _, tc := range testCases {
+ t.Run(tc.name, func(t *testing.T) {
+ assert.Equal(t, tc.expectedResult, tc.input.IsEmpty())
+ })
+ }
+}
diff --git a/pkg/scheduler/objects/preemption.go
b/pkg/scheduler/objects/preemption.go
index de003647..879800f8 100644
--- a/pkg/scheduler/objects/preemption.go
+++ b/pkg/scheduler/objects/preemption.go
@@ -63,6 +63,7 @@ type Preemptor struct {
type QueuePreemptionSnapshot struct {
Parent *QueuePreemptionSnapshot // snapshot of parent queue
QueuePath string // fully qualified path to
queue
+ Leaf bool // true if queue is a leaf
queue
AllocatedResource *resources.Resource // allocated resources
PreemptingResource *resources.Resource // resources currently
flagged for preemption
MaxResource *resources.Resource // maximum resources for
this queue
@@ -683,6 +684,7 @@ func (qps *QueuePreemptionSnapshot) Duplicate(copy
map[string]*QueuePreemptionSn
snapshot := &QueuePreemptionSnapshot{
Parent: parent,
QueuePath: qps.QueuePath,
+ Leaf: qps.Leaf,
AllocatedResource: qps.AllocatedResource.Clone(),
PreemptingResource: qps.PreemptingResource.Clone(),
MaxResource: qps.MaxResource.Clone(),
@@ -723,6 +725,11 @@ func (qps *QueuePreemptionSnapshot)
IsWithinGuaranteedResource() bool {
return false
}
guaranteed := qps.GetGuaranteedResource()
+
+ // if this is a leaf queue and we have not found any guaranteed
resources, then we are never within guaranteed usage
+ if qps.Leaf && guaranteed.IsEmpty() {
+ return false
+ }
max := qps.GetMaxResource()
absGuaranteed := resources.ComponentWiseMinPermissive(guaranteed, max)
used := resources.Sub(qps.AllocatedResource, qps.PreemptingResource)
diff --git a/pkg/scheduler/objects/preemption_test.go
b/pkg/scheduler/objects/preemption_test.go
index 90a3a24c..a7641878 100644
--- a/pkg/scheduler/objects/preemption_test.go
+++ b/pkg/scheduler/objects/preemption_test.go
@@ -128,6 +128,54 @@ func TestCheckPreemptionQueueGuarantees(t *testing.T) {
assert.Assert(t, !preemptor.checkPreemptionQueueGuarantees(), "queue
guarantees did not fail")
}
+func TestCheckPreemptionQueueGuaranteesWithNoGuaranteedResources(t *testing.T)
{
+ var tests = []struct {
+ testName string
+ expected bool
+ parentGuaranteed map[string]string
+ childGuaranteed map[string]string
+ }{
+ {"NoGuaranteed", false, map[string]string{},
map[string]string{}},
+ {"ParentGuaranteed", true, map[string]string{"first": "10"},
map[string]string{}},
+ {"ChildGuaranteed", true, map[string]string{},
map[string]string{"first": "5"}},
+ {"BothGuaranteed", true, map[string]string{"first": "10"},
map[string]string{"first": "5"}},
+ }
+ for _, tt := range tests {
+ t.Run(tt.testName, func(t *testing.T) {
+ node := newNode("node1",
map[string]resources.Quantity{"first": 20})
+ iterator := getNodeIteratorFn(node)
+ rootQ, err :=
createRootQueue(map[string]string{"first": "20"})
+ assert.NilError(t, err)
+ parentQ, err := createManagedQueueGuaranteed(rootQ,
"parent", true, map[string]string{"first": "20"}, tt.parentGuaranteed)
+ assert.NilError(t, err)
+ childQ1, err := createManagedQueueGuaranteed(parentQ,
"child1", false, map[string]string{"first": "10"}, map[string]string{"first":
"5"})
+ assert.NilError(t, err)
+ childQ2, err := createManagedQueueGuaranteed(parentQ,
"child2", false, map[string]string{"first": "10"}, tt.childGuaranteed)
+ assert.NilError(t, err)
+ app1 := newApplication(appID1, "default",
"root.parent.child1")
+ app1.SetQueue(childQ1)
+ childQ1.applications[appID1] = app1
+ ask1 := newAllocationAsk("alloc1", appID1,
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}))
+ assert.NilError(t, app1.AddAllocationAsk(ask1))
+ ask2 := newAllocationAsk("alloc2", appID1,
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}))
+ assert.NilError(t, app1.AddAllocationAsk(ask2))
+ app1.AddAllocation(NewAllocation("alloc1", "node1",
ask1))
+ app1.AddAllocation(NewAllocation("alloc2", "node1",
ask2))
+ assert.NilError(t,
childQ1.IncAllocatedResource(ask1.GetAllocatedResource(), false))
+ assert.NilError(t,
childQ1.IncAllocatedResource(ask2.GetAllocatedResource(), false))
+ app2 := newApplication(appID2, "default",
"root.parent.child2")
+ app2.SetQueue(childQ2)
+ childQ2.applications[appID2] = app2
+ ask3 := newAllocationAsk("alloc3", appID2,
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}))
+ assert.NilError(t, app2.AddAllocationAsk(ask3))
+ childQ2.incPendingResource(ask3.GetAllocatedResource())
+ headRoom :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10})
+ preemptor := NewPreemptor(app2, headRoom,
30*time.Second, ask3, iterator(), false)
+ assert.Equal(t, tt.expected,
preemptor.checkPreemptionQueueGuarantees(), "unexpected result")
+ })
+ }
+}
+
func TestTryPreemption(t *testing.T) {
node := newNode("node1", map[string]resources.Quantity{"first": 10,
"pods": 5})
iterator := getNodeIteratorFn(node)
diff --git a/pkg/scheduler/objects/queue.go b/pkg/scheduler/objects/queue.go
index 511488f2..7eb01b89 100644
--- a/pkg/scheduler/objects/queue.go
+++ b/pkg/scheduler/objects/queue.go
@@ -1722,6 +1722,7 @@ func (sq *Queue) createPreemptionSnapshot(cache
map[string]*QueuePreemptionSnaps
snapshot = &QueuePreemptionSnapshot{
Parent: parentSnapshot,
QueuePath: sq.QueuePath,
+ Leaf: sq.isLeaf,
AllocatedResource: sq.allocatedResource.Clone(),
PreemptingResource: sq.preemptingResource.Clone(),
MaxResource: sq.maxResource.Clone(),
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]