This is an automated email from the ASF dual-hosted git repository.
mani pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git
The following commit(s) were added to refs/heads/master by this push:
new 6bd254cf [YUNIKORN-3140] Quota change through preemption config
changes (#1034)
6bd254cf is described below
commit 6bd254cf1972d2635d0087a390969454e243fd6c
Author: mani <[email protected]>
AuthorDate: Wed Nov 5 10:29:08 2025 +0530
[YUNIKORN-3140] Quota change through preemption config changes (#1034)
Closes: #1034
Signed-off-by: mani <[email protected]>
---
pkg/common/configs/config.go | 5 ++
pkg/common/configs/configvalidator.go | 3 +
pkg/common/configs/configvalidator_test.go | 28 ++++++++
pkg/scheduler/objects/queue.go | 100 ++++++++++++++++++++---------
pkg/scheduler/objects/queue_test.go | 98 +++++++++++++++++++++++++++-
5 files changed, 202 insertions(+), 32 deletions(-)
diff --git a/pkg/common/configs/config.go b/pkg/common/configs/config.go
index 32a4ef2d..792f833b 100644
--- a/pkg/common/configs/config.go
+++ b/pkg/common/configs/config.go
@@ -84,6 +84,11 @@ type QueueConfig struct {
ChildTemplate ChildTemplate `yaml:",omitempty" json:",omitempty"`
Queues []QueueConfig `yaml:",omitempty" json:",omitempty"`
Limits []Limit `yaml:",omitempty" json:",omitempty"`
+ Preemption Preemption `yaml:",omitempty" json:",omitempty"`
+}
+
+type Preemption struct {
+ Delay uint64 `yaml:",omitempty" json:",omitempty"`
}
type ChildTemplate struct {
diff --git a/pkg/common/configs/configvalidator.go
b/pkg/common/configs/configvalidator.go
index 76b537f7..c311823b 100644
--- a/pkg/common/configs/configvalidator.go
+++ b/pkg/common/configs/configvalidator.go
@@ -662,6 +662,9 @@ func checkQueues(queue *QueueConfig, level int) error {
return fmt.Errorf("duplicate child name found with name
'%s', level %d", child.Name, level)
}
queueMap[strings.ToLower(child.Name)] = true
+ if queue.Preemption.Delay != 0 && queue.Preemption.Delay <= 60 {
+ return fmt.Errorf("invalid preemption delay %d, must be
greater than 60 seconds", queue.Preemption.Delay)
+ }
}
// recurse into the depth if this level passed
diff --git a/pkg/common/configs/configvalidator_test.go
b/pkg/common/configs/configvalidator_test.go
index 4d0c6495..e62f4b3f 100644
--- a/pkg/common/configs/configvalidator_test.go
+++ b/pkg/common/configs/configvalidator_test.go
@@ -2327,6 +2327,34 @@ func TestCheckQueues(t *testing.T) { //nolint:funlen
assert.Equal(t, 2, len(q.Queues), "Expected two
queues")
},
},
+ {
+ name: "Invalid Preemption delay for leaf queue",
+ queue: &QueueConfig{
+ Name: "root",
+ Queues: []QueueConfig{
+ {Name: "leaf",
+ Preemption: Preemption{
+ Delay: 10,
+ },
+ },
+ },
+ Preemption: Preemption{
+ Delay: 10,
+ },
+ },
+ level: 1,
+ expectedErrorMsg: "invalid preemption delay 10, must be
greater than 60 seconds",
+ },
+ {
+ name: "Setting Preemption delay on root queue would be
ignored",
+ queue: &QueueConfig{
+ Name: "root",
+ Preemption: Preemption{
+ Delay: 10,
+ },
+ },
+ level: 0,
+ },
}
for _, tc := range testCases {
diff --git a/pkg/scheduler/objects/queue.go b/pkg/scheduler/objects/queue.go
index a81b1bfa..db6b972a 100644
--- a/pkg/scheduler/objects/queue.go
+++ b/pkg/scheduler/objects/queue.go
@@ -74,21 +74,22 @@ type Queue struct {
// The queue properties should be treated as immutable the value is a
merge of the
// parent properties with the config for this queue only manipulated
during creation
// of the queue or via a queue configuration update.
- properties map[string]string
- adminACL security.ACL // admin ACL
- submitACL security.ACL // submit ACL
- maxResource *resources.Resource // When not set, max = nil
- guaranteedResource *resources.Resource // When not set, Guaranteed
== 0
- isLeaf bool // this is a leaf queue or
not (i.e. parent)
- isManaged bool // queue is part of the
config, not auto created
- stateMachine *fsm.FSM // the state of the queue
for scheduling
- stateTime time.Time // last time the state was
updated (needed for cleanup)
- maxRunningApps uint64
- runningApps uint64
- allocatingAcceptedApps map[string]bool
- template *template.Template
- queueEvents *schedEvt.QueueEvents
- appQueueMapping *AppQueueMapping // appID mapping to queues
+ properties map[string]string
+ adminACL security.ACL // admin ACL
+ submitACL security.ACL // submit ACL
+ maxResource *resources.Resource // When not set, max =
nil
+ guaranteedResource *resources.Resource // When not set,
Guaranteed == 0
+ isLeaf bool // this is a leaf queue
or not (i.e. parent)
+ isManaged bool // queue is part of the
config, not auto created
+ stateMachine *fsm.FSM // the state of the
queue for scheduling
+ stateTime time.Time // last time the state
was updated (needed for cleanup)
+ maxRunningApps uint64
+ runningApps uint64
+ allocatingAcceptedApps map[string]bool
+ template *template.Template
+ queueEvents *schedEvt.QueueEvents
+ appQueueMapping *AppQueueMapping // appID mapping to queues
+ quotaChangePreemptionDelay uint64
locking.RWMutex
}
@@ -96,21 +97,22 @@ type Queue struct {
// newBlankQueue creates a new empty queue objects with all values initialised.
func newBlankQueue() *Queue {
return &Queue{
- children: make(map[string]*Queue),
- childPriorities: make(map[string]int32),
- applications: make(map[string]*Application),
- appPriorities: make(map[string]int32),
- reservedApps: make(map[string]int),
- allocatingAcceptedApps: make(map[string]bool),
- properties: make(map[string]string),
- stateMachine: NewObjectState(),
- allocatedResource: resources.NewResource(),
- preemptingResource: resources.NewResource(),
- pending: resources.NewResource(),
- currentPriority: configs.MinPriority,
- prioritySortEnabled: true,
- preemptionDelay: configs.DefaultPreemptionDelay,
- preemptionPolicy: policies.DefaultPreemptionPolicy,
+ children: make(map[string]*Queue),
+ childPriorities: make(map[string]int32),
+ applications: make(map[string]*Application),
+ appPriorities: make(map[string]int32),
+ reservedApps: make(map[string]int),
+ allocatingAcceptedApps: make(map[string]bool),
+ properties: make(map[string]string),
+ stateMachine: NewObjectState(),
+ allocatedResource: resources.NewResource(),
+ preemptingResource: resources.NewResource(),
+ pending: resources.NewResource(),
+ currentPriority: configs.MinPriority,
+ prioritySortEnabled: true,
+ preemptionDelay: configs.DefaultPreemptionDelay,
+ preemptionPolicy: policies.DefaultPreemptionPolicy,
+ quotaChangePreemptionDelay: 0,
}
}
@@ -155,7 +157,6 @@ func NewConfiguredQueue(conf configs.QueueConfig, parent
*Queue, silence bool, a
zap.String("queueName", sq.QueuePath))
sq.queueEvents.SendNewQueueEvent(sq.QueuePath, sq.isManaged)
}
-
return sq, nil
}
@@ -372,17 +373,54 @@ func (sq *Queue) applyConf(conf configs.QueueConfig,
silence bool) error {
// Load the max & guaranteed resources and maxApps for all but the root
queue
if sq.Name != configs.RootQueue {
+ oldMaxResource := sq.maxResource
if err = sq.setResourcesFromConf(conf.Resources); err != nil {
return err
}
sq.maxRunningApps = conf.MaxApplications
sq.updateMaxRunningAppsMetrics()
+ sq.setPreemptionSettings(oldMaxResource, conf)
}
sq.properties = conf.Properties
return nil
}
+// setPreemptionSettings Set Quota change preemption settings
+func (sq *Queue) setPreemptionSettings(oldMaxResource *resources.Resource,
conf configs.QueueConfig) {
+ newMaxResource, err := resources.NewResourceFromConf(conf.Resources.Max)
+ if err != nil {
+ log.Log(log.SchedQueue).Error("parsing failed on max resources
this should not happen",
+ zap.String("queue", sq.QueuePath),
+ zap.Error(err))
+ return
+ }
+
+ switch {
+ // Set max res earlier but not now
+ case resources.IsZero(newMaxResource) &&
!resources.IsZero(oldMaxResource):
+ sq.quotaChangePreemptionDelay = 0
+ // Set max res now but not earlier
+ case !resources.IsZero(newMaxResource) &&
resources.IsZero(oldMaxResource) && conf.Preemption.Delay != 0:
+ sq.quotaChangePreemptionDelay = conf.Preemption.Delay
+ // Set max res earlier and now as well
+ default:
+ switch {
+ // Quota decrease
+ case resources.StrictlyGreaterThan(oldMaxResource,
newMaxResource) && conf.Preemption.Delay != 0:
+ sq.quotaChangePreemptionDelay = conf.Preemption.Delay
+ // Quota increase
+ case resources.StrictlyGreaterThan(newMaxResource,
oldMaxResource) && conf.Preemption.Delay != 0:
+ sq.quotaChangePreemptionDelay = 0
+ // Quota remains as is but delay has changed
+ case resources.Equals(oldMaxResource, newMaxResource) &&
conf.Preemption.Delay != 0 && sq.quotaChangePreemptionDelay !=
conf.Preemption.Delay:
+ sq.quotaChangePreemptionDelay = conf.Preemption.Delay
+ default:
+ // noop
+ }
+ }
+}
+
// setResourcesFromConf sets the maxResource and guaranteedResource of the
queue from the config.
func (sq *Queue) setResourcesFromConf(resource configs.Resources) error {
maxResource, err := resources.NewResourceFromConf(resource.Max)
diff --git a/pkg/scheduler/objects/queue_test.go
b/pkg/scheduler/objects/queue_test.go
index c6c2594e..67d5a4c2 100644
--- a/pkg/scheduler/objects/queue_test.go
+++ b/pkg/scheduler/objects/queue_test.go
@@ -2246,6 +2246,88 @@ func TestApplyConf(t *testing.T) {
assert.Equal(t, root.maxRunningApps, uint64(0))
}
+func TestQuotaChangePreemptionSettings(t *testing.T) {
+ root, err := createManagedQueueWithProps(nil, "root", true, nil, nil)
+ assert.NilError(t, err, "failed to create basic queue: %v", err)
+
+ parent, err := createManagedQueueWithProps(root, "parent", false, nil,
nil)
+ assert.NilError(t, err, "failed to create basic queue: %v", err)
+ testCases := []struct {
+ name string
+ conf configs.QueueConfig
+ expectedDelay uint64
+ }{{"first time queue setup without delay", configs.QueueConfig{
+ Resources: configs.Resources{
+ Max: getResourceConf(),
+ Guaranteed: getResourceConf(),
+ },
+ }, 0},
+ {"clearing max resources", configs.QueueConfig{
+ Resources: configs.Resources{
+ Max: nil,
+ Guaranteed: nil,
+ },
+ }, 0},
+ {"first time queue setup with delay", configs.QueueConfig{
+ Resources: configs.Resources{
+ Max: getResourceConf(),
+ Guaranteed: getResourceConf(),
+ },
+ Preemption: configs.Preemption{
+ Delay: 100,
+ },
+ }, 100},
+ {"increase max with delay", configs.QueueConfig{
+ Resources: configs.Resources{
+ Max: map[string]string{"memory": "100000000"},
+ },
+ Preemption: configs.Preemption{
+ Delay: 500,
+ },
+ }, 0},
+ {"decrease max with delay", configs.QueueConfig{
+ Resources: configs.Resources{
+ Max: map[string]string{"memory": "100"},
+ },
+ Preemption: configs.Preemption{
+ Delay: 500,
+ },
+ }, 500},
+ {"max remains as is but delay changed", configs.QueueConfig{
+ Resources: configs.Resources{
+ Max: map[string]string{"memory": "100"},
+ },
+ Preemption: configs.Preemption{
+ Delay: 200,
+ },
+ }, 200},
+ {"unrelated config change, should not impact earlier set
preemption settings", configs.QueueConfig{
+ Resources: configs.Resources{
+ Max: map[string]string{"memory": "100"},
+ Guaranteed: map[string]string{"memory": "50"},
+ },
+ Preemption: configs.Preemption{
+ Delay: 200,
+ },
+ }, 200},
+ {"increase max again with delay", configs.QueueConfig{
+ Resources: configs.Resources{
+ Max: map[string]string{"memory": "101"},
+ },
+ Preemption: configs.Preemption{
+ Delay: 200,
+ },
+ }, 0}}
+
+ for _, tc := range testCases {
+ t.Run(tc.name, func(t *testing.T) {
+ err = parent.ApplyConf(tc.conf)
+ assert.NilError(t, err, "failed to apply conf: %v", err)
+ assert.Equal(t, parent.quotaChangePreemptionDelay,
tc.expectedDelay)
+ })
+ }
+}
+
func TestNewConfiguredQueue(t *testing.T) {
// check variable assignment
properties := getProperties()
@@ -2275,6 +2357,7 @@ func TestNewConfiguredQueue(t *testing.T) {
assert.DeepEqual(t, properties, parent.template.GetProperties())
assert.Assert(t, resources.Equals(resourceStruct,
parent.template.GetMaxResource()))
assert.Assert(t, resources.Equals(resourceStruct,
parent.template.GetGuaranteedResource()))
+ assert.Equal(t, parent.quotaChangePreemptionDelay, uint64(0))
// case 0: managed leaf queue can't use template
leafConfig := configs.QueueConfig{
@@ -2285,6 +2368,9 @@ func TestNewConfiguredQueue(t *testing.T) {
Max: getResourceConf(),
Guaranteed: getResourceConf(),
},
+ Preemption: configs.Preemption{
+ Delay: 500,
+ },
}
childLeaf, err := NewConfiguredQueue(leafConfig, parent, false, nil)
assert.NilError(t, err, "failed to create queue: %v", err)
@@ -2297,11 +2383,15 @@ func TestNewConfiguredQueue(t *testing.T) {
childLeafGuaranteed, err :=
resources.NewResourceFromConf(leafConfig.Resources.Guaranteed)
assert.NilError(t, err, "Resource creation failed")
assert.Assert(t, resources.Equals(childLeaf.guaranteedResource,
childLeafGuaranteed))
+ assert.Equal(t, childLeaf.quotaChangePreemptionDelay, uint64(500))
// case 1: non-leaf can't use template but it can inherit template from
parent
NonLeafConfig := configs.QueueConfig{
Name: "nonleaf_queue",
Parent: true,
+ Preemption: configs.Preemption{
+ Delay: 500,
+ },
}
childNonLeaf, err := NewConfiguredQueue(NonLeafConfig, parent, false,
nil)
assert.NilError(t, err, "failed to create queue: %v", err)
@@ -2310,6 +2400,7 @@ func TestNewConfiguredQueue(t *testing.T) {
assert.Equal(t, len(childNonLeaf.properties), 0)
assert.Assert(t, childNonLeaf.guaranteedResource == nil)
assert.Assert(t, childNonLeaf.maxResource == nil)
+ assert.Equal(t, childNonLeaf.quotaChangePreemptionDelay, uint64(0))
// case 2: do not send queue event when silence flag is set to true
events.Init()
@@ -2317,12 +2408,17 @@ func TestNewConfiguredQueue(t *testing.T) {
eventSystem.StartServiceWithPublisher(false)
rootConfig := configs.QueueConfig{
Name: "root",
+ Preemption: configs.Preemption{
+ Delay: 500,
+ },
}
- _, err = NewConfiguredQueue(rootConfig, nil, true, nil)
+
+ rootQ, err := NewConfiguredQueue(rootConfig, nil, true, nil)
assert.NilError(t, err, "failed to create queue: %v", err)
time.Sleep(time.Second)
noEvents := eventSystem.Store.CountStoredEvents()
assert.Equal(t, noEvents, uint64(0), "expected 0 event, got %d",
noEvents)
+ assert.Equal(t, rootQ.quotaChangePreemptionDelay, uint64(0))
}
func TestResetRunningState(t *testing.T) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]