This is an automated email from the ASF dual-hosted git repository.
pbacsko pushed a commit to branch branch-1.8
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git
The following commit(s) were added to refs/heads/branch-1.8 by this push:
new e295b9b8 [YUNIKORN-3182] Handle queues when some other queue in the
same hierarchy in going through preemption (#1056)
e295b9b8 is described below
commit e295b9b8880bf04cab33e4b5eb3262dd865b8c6c
Author: mani <[email protected]>
AuthorDate: Fri Jan 9 12:52:03 2026 +0100
[YUNIKORN-3182] Handle queues when some other queue in the same hierarchy
in going through preemption (#1056)
Closes: #1056
Signed-off-by: Peter Bacsko <[email protected]>
(cherry picked from commit 2a0db106484ec74edc9524f8927604fe7e2e98b6)
---
pkg/scheduler/objects/queue.go | 31 ++++++++++++-
pkg/scheduler/objects/queue_test.go | 53 ++++++++++++++++++++++
pkg/scheduler/objects/quota_change_preemptor.go | 2 +-
.../objects/quota_change_preemptor_test.go | 19 ++++----
4 files changed, 95 insertions(+), 10 deletions(-)
diff --git a/pkg/scheduler/objects/queue.go b/pkg/scheduler/objects/queue.go
index 6aeded21..f8044c24 100644
--- a/pkg/scheduler/objects/queue.go
+++ b/pkg/scheduler/objects/queue.go
@@ -2196,12 +2196,41 @@ func (sq *Queue) MarkQuotaChangePreemptionRunning(run
bool) {
sq.isQuotaChangePreemptionRunning = run
}
-func (sq *Queue) IsQuotaChangePreemptionRunning() bool {
+func (sq *Queue) isQCPreemptionRunningForParent() bool {
+ if sq == nil {
+ return false
+ }
+ if sq.parent != nil {
+ if sq.parent.isQCPreemptionRunningForParent() {
+ return true
+ }
+ }
+ sq.RLock()
+ defer sq.RUnlock()
+ return sq.isQuotaChangePreemptionRunning
+}
+
+func (sq *Queue) isQCPreemptionRunningForChild() bool {
+ for _, child := range sq.GetCopyOfChildren() {
+ if child.isQCPreemptionRunningForChild() {
+ return true
+ }
+ }
sq.RLock()
defer sq.RUnlock()
return sq.isQuotaChangePreemptionRunning
}
+func (sq *Queue) IsQCPreemptionRunning() bool {
+ if sq.isQCPreemptionRunningForParent() {
+ return true
+ }
+ if sq.isQCPreemptionRunningForChild() {
+ return true
+ }
+ return false
+}
+
func (sq *Queue) GetMaxAppUnschedAskBackoff() uint64 {
sq.RLock()
defer sq.RUnlock()
diff --git a/pkg/scheduler/objects/queue_test.go
b/pkg/scheduler/objects/queue_test.go
index 8b291bc0..520e7e2d 100644
--- a/pkg/scheduler/objects/queue_test.go
+++ b/pkg/scheduler/objects/queue_test.go
@@ -3142,3 +3142,56 @@ func TestQueueBackoffProperties(t *testing.T) {
assert.Equal(t, uint64(0), leaf3.GetMaxAppUnschedAskBackoff())
assert.Equal(t, 30*time.Second, leaf3.GetBackoffDelay())
}
+
+func TestQueue_IsQCPreemptionRunning(t *testing.T) {
+ // create the root
+ root, err := createManagedQueueMaxApps(nil, "root", true, nil, 1)
+ assert.NilError(t, err, "queue create failed")
+ parent, err := createManagedQueue(root, "parent", true, nil)
+ assert.NilError(t, err, "failed to create parent queue")
+
+ var leaf, leaf2, leaf11, leaf111 *Queue
+ leaf, err = createManagedQueue(parent, "leaf", true, nil)
+ assert.NilError(t, err, "failed to create leaf queue")
+ leaf2, err = createManagedQueue(parent, "leaf2", false, nil)
+ assert.NilError(t, err, "failed to create leaf2 queue")
+
+ leaf11, err = createManagedQueue(leaf, "leaf11", true, nil)
+ assert.NilError(t, err, "failed to create leaf11 queue")
+
+ leaf111, err = createManagedQueue(leaf11, "leaf111", false, nil)
+ assert.NilError(t, err, "failed to create leaf111 queue")
+
+ // root.parent is running. any queue located in this hierarchy (both
upwards and downwards) should return true. All branches of parent should return
true.
+ parent.isQuotaChangePreemptionRunning = true
+ assert.Equal(t, parent.IsQCPreemptionRunning(), true)
+ assert.Equal(t, root.IsQCPreemptionRunning(), true)
+ assert.Equal(t, leaf111.IsQCPreemptionRunning(), true)
+ assert.Equal(t, leaf11.IsQCPreemptionRunning(), true)
+ assert.Equal(t, leaf.IsQCPreemptionRunning(), true)
+ assert.Equal(t, leaf2.IsQCPreemptionRunning(), true)
+
+ // reset
+ parent.isQuotaChangePreemptionRunning = false
+
+ // root.parent.leaf111 (leaf queue) is running. any queue located in
this hierarchy (upwards) should return true. Other branches of parent should
return false.
+ leaf111.isQuotaChangePreemptionRunning = true
+ assert.Equal(t, parent.IsQCPreemptionRunning(), true)
+ assert.Equal(t, root.IsQCPreemptionRunning(), true)
+ assert.Equal(t, leaf111.IsQCPreemptionRunning(), true)
+ assert.Equal(t, leaf11.IsQCPreemptionRunning(), true)
+ assert.Equal(t, leaf.IsQCPreemptionRunning(), true)
+ assert.Equal(t, leaf2.IsQCPreemptionRunning(), false)
+
+ // reset
+ leaf111.isQuotaChangePreemptionRunning = false
+
+ // root.parent.leaf2 (leaf queue) is running. any queue located in this
hierarchy (upwards) should return true. Other branches of parent should return
false.
+ leaf2.isQuotaChangePreemptionRunning = true
+ assert.Equal(t, parent.IsQCPreemptionRunning(), true)
+ assert.Equal(t, root.IsQCPreemptionRunning(), true)
+ assert.Equal(t, leaf111.IsQCPreemptionRunning(), false)
+ assert.Equal(t, leaf11.IsQCPreemptionRunning(), false)
+ assert.Equal(t, leaf.IsQCPreemptionRunning(), false)
+ assert.Equal(t, leaf2.IsQCPreemptionRunning(), true)
+}
diff --git a/pkg/scheduler/objects/quota_change_preemptor.go
b/pkg/scheduler/objects/quota_change_preemptor.go
index 8f8a024d..1d61c7d3 100644
--- a/pkg/scheduler/objects/quota_change_preemptor.go
+++ b/pkg/scheduler/objects/quota_change_preemptor.go
@@ -51,7 +51,7 @@ func NewQuotaChangePreemptor(queue *Queue)
*QuotaChangePreemptionContext {
}
func (qcp *QuotaChangePreemptionContext) CheckPreconditions() bool {
- if !qcp.queue.IsManaged() || qcp.queue.IsQuotaChangePreemptionRunning()
{
+ if !qcp.queue.IsManaged() || qcp.queue.IsQCPreemptionRunning() {
return false
}
if
qcp.maxResource.StrictlyGreaterThanOrEqualsOnlyExisting(qcp.queue.GetAllocatedResource())
{
diff --git a/pkg/scheduler/objects/quota_change_preemptor_test.go
b/pkg/scheduler/objects/quota_change_preemptor_test.go
index 267e8bbd..1fd3b1ef 100644
--- a/pkg/scheduler/objects/quota_change_preemptor_test.go
+++ b/pkg/scheduler/objects/quota_change_preemptor_test.go
@@ -63,7 +63,6 @@ func TestQuotaChangeCheckPreconditions(t *testing.T) {
Resources: leafRes,
}, parent, false, nil)
assert.NilError(t, err)
- alreadyPreemptionRunning.MarkQuotaChangePreemptionRunning(true)
usageExceededMaxQueue, err := NewConfiguredQueue(configs.QueueConfig{
Name: "leaf-usage-exceeded-max",
@@ -91,22 +90,26 @@ func TestQuotaChangeCheckPreconditions(t *testing.T) {
testCases := []struct {
name string
queue *Queue
+ preemptionRunning bool
preconditionResult bool
}{
- {"parent queue", parent, true},
- {"leaf queue", leaf, false},
- {"dynamic leaf queue", dynamicLeaf, false},
- {"leaf queue, already preemption process started or running",
alreadyPreemptionRunning, false},
- {"leaf queue, usage exceeded max resources",
usageExceededMaxQueue, true},
- {"leaf queue, usage equals max resources", usageEqualsMaxQueue,
false},
- {"leaf queue, usage res not matching max resources",
usageNotMatchingMaxQueue, false},
+ {"parent queue", parent, false, true},
+ {"leaf queue", leaf, false, false},
+ {"dynamic leaf queue", dynamicLeaf, false, false},
+ {"leaf queue, usage exceeded max resources",
usageExceededMaxQueue, false, true},
+ {"leaf queue, usage equals max resources", usageEqualsMaxQueue,
false, false},
+ {"leaf queue, already preemption process started or running",
alreadyPreemptionRunning, true, false},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
+
tc.queue.MarkQuotaChangePreemptionRunning(tc.preemptionRunning)
preemptor := NewQuotaChangePreemptor(tc.queue)
assert.Equal(t, preemptor.CheckPreconditions(),
tc.preconditionResult)
})
}
+ // Since parent's leaf queue "leaf-already-preemption-running" is
running, parent preconditions passed earlier should fail now
+ preemptor := NewQuotaChangePreemptor(parent)
+ assert.Equal(t, preemptor.CheckPreconditions(), false)
}
func TestQuotaChangeGetPreemptableResource(t *testing.T) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]