This is an automated email from the ASF dual-hosted git repository.
wilfreds pushed a commit to branch branch-1.8
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git
The following commit(s) were added to refs/heads/branch-1.8 by this push:
new 656455ab [YUNIKORN-3193] Quota preemption to be turned off by default
(#1061)
656455ab is described below
commit 656455abb3523ba3802d1e79513b9f34a2cc69da
Author: Wilfred Spiegelenburg <[email protected]>
AuthorDate: Mon Jan 19 11:21:01 2026 +1100
[YUNIKORN-3193] Quota preemption to be turned off by default (#1061)
Quota preemption is a new functionality. Quota preemption is turned off
by default at the partition level via a config. To use quota preemption
the 'QuotaPreemptionEnabled' flag in the partition preemption config
must be set top 'true'.
----
partitions:
- name: default
preemption:
quotapreemptionenabled: true
----
When turned on the delay for quota preemption is set via the queue
property: `preemption.quota.delay`, The property uses the same syntax as
the preemption delay: a Golang duration in string form.
The default delay is set to '0s' and no preemption will be triggered.
Quota preemption is only supported on managed queues not on dynamic
queues.
Closes: #1061
Signed-off-by: Wilfred Spiegelenburg <[email protected]>
(cherry picked from commit 98832728ec40ddd105a48037b06f0399f34ca463)
---
pkg/common/configs/config.go | 37 +-
pkg/common/configs/config_test.go | 7 +-
pkg/common/configs/configvalidator.go | 33 +-
pkg/common/configs/configvalidator_test.go | 28 --
pkg/scheduler/objects/preemption_utilities_test.go | 2 +-
pkg/scheduler/objects/queue.go | 396 +++++++++----------
pkg/scheduler/objects/queue_test.go | 417 +++++++++++----------
...uota_change_preemptor.go => quota_preemptor.go} | 216 +++++------
...e_preemptor_test.go => quota_preemptor_test.go} | 104 +----
pkg/scheduler/partition.go | 21 +-
pkg/scheduler/partition_test.go | 28 +-
pkg/webservice/dao/partition_info.go | 11 +-
pkg/webservice/dao/queue_info.go | 1 +
pkg/webservice/handlers.go | 1 +
14 files changed, 615 insertions(+), 687 deletions(-)
diff --git a/pkg/common/configs/config.go b/pkg/common/configs/config.go
index 792f833b..50a35fe6 100644
--- a/pkg/common/configs/config.go
+++ b/pkg/common/configs/config.go
@@ -32,14 +32,14 @@ import (
"github.com/apache/yunikorn-core/pkg/log"
)
-// The configuration can contain multiple partitions. Each partition contains
the queue definition for a logical
+// SchedulerConfig can contain multiple partitions. Each partition contains
the queue definition for a logical
// set of scheduler resources.
type SchedulerConfig struct {
Partitions []PartitionConfig
Checksum string `yaml:",omitempty" json:",omitempty"`
}
-// The partition object for each partition:
+// PartitionConfig for each partition:
// - the name of the partition
// - a list of sub or child queues
// - a list of placement rule definition objects
@@ -60,12 +60,13 @@ type UserGroupResolver struct {
Type string `yaml:"type,omitempty" json:"type,omitempty"`
}
-// The partition preemption configuration
+// PartitionPreemptionConfig defines global flags for both preemption types
type PartitionPreemptionConfig struct {
- Enabled *bool `yaml:",omitempty" json:",omitempty"`
+ Enabled *bool `yaml:",omitempty" json:",omitempty"`
+ QuotaPreemptionEnabled *bool `yaml:",omitempty" json:",omitempty"`
}
-// The queue object for each queue:
+// QueueConfig object for each queue:
// - the name of the queue
// - a resources object to specify resource limits on the queue
// - the maximum number of applications that can run in the queue
@@ -84,20 +85,16 @@ type QueueConfig struct {
ChildTemplate ChildTemplate `yaml:",omitempty" json:",omitempty"`
Queues []QueueConfig `yaml:",omitempty" json:",omitempty"`
Limits []Limit `yaml:",omitempty" json:",omitempty"`
- Preemption Preemption `yaml:",omitempty" json:",omitempty"`
-}
-
-type Preemption struct {
- Delay uint64 `yaml:",omitempty" json:",omitempty"`
}
+// ChildTemplate set on a parent queue with settings to be applied to the
child created via a placement rule.
type ChildTemplate struct {
MaxApplications uint64 `yaml:",omitempty" json:",omitempty"`
Properties map[string]string `yaml:",omitempty" json:",omitempty"`
Resources Resources `yaml:",omitempty" json:",omitempty"`
}
-// The resource limits to set on the queue. The definition allows for an
unlimited number of types to be used.
+// The Resources limit to apply on the queue. The definition allows for an
unlimited number of types to be used.
// The mapping to "known" resources is not handled here.
// - guaranteed resources
// - max resources
@@ -106,12 +103,12 @@ type Resources struct {
Max map[string]string `yaml:",omitempty" json:",omitempty"`
}
-// The queue placement rule definition
+// The PlacementRule definition:
// - the name of the rule
// - create flag: can the rule create a queue
// - user and group filter to be applied on the callers
// - rule link to allow setting a rule to generate the parent
-// - value a generic value interpreted depending on the rule type (i.e queue
name for the "fixed" rule
+// - value a generic value interpreted depending on the rule type (i.e. queue
name for the "fixed" rule
// or the application label name for the "tag" rule)
type PlacementRule struct {
Name string
@@ -121,7 +118,7 @@ type PlacementRule struct {
Value string `yaml:",omitempty" json:",omitempty"`
}
-// The user and group filter for a rule.
+// Filter for users and groups for a PlacementRule.
// - type of filter (allow or deny filter, empty means allow)
// - list of users to filter (maybe empty)
// - list of groups to filter (maybe empty)
@@ -132,13 +129,13 @@ type Filter struct {
Groups []string `yaml:",omitempty" json:",omitempty"`
}
-// A list of limit objects to define limits for a partition or queue
+// Limits is a list of Limit objects to define user and group limits for a
partition or queue.
type Limits struct {
Limit []Limit
}
-// The limit object to specify user and or group limits at different levels in
the partition or queues
-// Different limits for the same user or group may be defined at different
levels in the hierarchy
+// A Limit object to specify user and or group limits at different levels in
the partition or queues.
+// Different limits for the same user or group may be defined at different
levels in the hierarchy:
// - limit description (optional)
// - list of users (maybe empty)
// - list of groups (maybe empty)
@@ -152,8 +149,10 @@ type Limit struct {
MaxApplications uint64 `yaml:",omitempty" json:",omitempty"`
}
-// Global Node Sorting Policy section
-// - type: different type of policies supported (binpacking, fair etc)
+// NodeSortingPolicy to be applied globally.
+// - type: different type of policies supported (binpacking, fair etc.)
+// - resource weight: factor to be applied to comparisons of different
resource types when sorting nodes. Types not
+// mentioned have a weight of 1.0.
type NodeSortingPolicy struct {
Type string
ResourceWeights map[string]float64 `yaml:",omitempty" json:",omitempty"`
diff --git a/pkg/common/configs/config_test.go
b/pkg/common/configs/config_test.go
index 53f737c0..72eaf31e 100644
--- a/pkg/common/configs/config_test.go
+++ b/pkg/common/configs/config_test.go
@@ -626,6 +626,7 @@ partitions:
- name: root
preemption:
enabled: true
+ quotapreemptionenabled: true
- name: "partition-0"
queues:
- name: root
@@ -637,6 +638,7 @@ partitions:
- name: root
preemption:
enabled: false
+ quotapreemptionenabled: false
- name: "partition-0"
queues:
- name: root
@@ -644,15 +646,18 @@ partitions:
// validate the config and check after the update
conf, err := CreateConfig(data)
assert.NilError(t, err, "should expect no error")
- assert.Assert(t, conf.Partitions[0].Preemption.Enabled == nil, "default
should be nil")
+ assert.Assert(t, conf.Partitions[0].Preemption.Enabled == nil, "default
preemption should be nil")
+ assert.Assert(t, conf.Partitions[0].Preemption.QuotaPreemptionEnabled
== nil, "default quota preemption should be nil")
conf, err = CreateConfig(dataEnabled)
assert.NilError(t, err, "should expect no error")
assert.Assert(t, *conf.Partitions[0].Preemption.Enabled, "preemption
should be enabled")
+ assert.Assert(t, *conf.Partitions[0].Preemption.QuotaPreemptionEnabled,
"quota preemption should be enabled")
conf, err = CreateConfig(dataDisabled)
assert.NilError(t, err, "should expect no error")
assert.Assert(t, !*conf.Partitions[0].Preemption.Enabled, "preemption
should be disabled")
+ assert.Assert(t,
!*conf.Partitions[0].Preemption.QuotaPreemptionEnabled, "quota preemption
should be disabled")
}
func TestParseRule(t *testing.T) {
diff --git a/pkg/common/configs/configvalidator.go
b/pkg/common/configs/configvalidator.go
index 109d295f..23e90577 100644
--- a/pkg/common/configs/configvalidator.go
+++ b/pkg/common/configs/configvalidator.go
@@ -41,6 +41,7 @@ const (
DotReplace = "_dot_"
DefaultPartition = "default"
+ // constants defining the names for properties
ApplicationSortPolicy = "application.sort.policy"
ApplicationSortPriority = "application.sort.priority"
ApplicationUnschedulableAsksBackoff =
"application.unschedasks.backoff"
@@ -49,6 +50,7 @@ const (
PriorityOffset = "priority.offset"
PreemptionPolicy = "preemption.policy"
PreemptionDelay = "preemption.delay"
+ QuotaPreemptionDelay = "preemption.quota.delay"
// app sort priority values
ApplicationSortPriorityEnabled = "enabled"
@@ -61,35 +63,39 @@ const (
errLastQueueLeaf
)
-type placementPathCheckResult int
-
-// Priority
+// MinPriority and MaxPriority used in offset calculations. K8s uses an int32
value for priorities, internal calculations
+// use int64.
var MinPriority int32 = math.MinInt32
var MaxPriority int32 = math.MaxInt32
+// DefaultQuotaPreemptionDelay is 0, no quota preemption by default
+var DefaultQuotaPreemptionDelay time.Duration = 0
+
+// DefaultPreemptionDelay is 30 seconds, guaranteed resources must be set to
trigger preemption
var DefaultPreemptionDelay = 30 * time.Second
var DefaultAskBackOffDelay = 30 * time.Second
-// A queue can be a username with the dot replaced. Most systems allow a 32
character user name.
+// QueueNameRegExp to validate the name of a queue.
+// A queue can be a username with the dot replaced. Most systems allow a 32
character username.
// The queue name must thus allow for at least that length with the
replacement of dots.
var QueueNameRegExp = regexp.MustCompile(`^[a-zA-Z0-9_:#/@-]{1,64}$`)
-// User and group name check: systems allow different things POSIX is the base
but we need to be lenient and allow more.
-// A username must start with a letter(uppercase or lowercase),
-// followed by any number of letter(uppercase or lowercase), digits, ':', '#',
'/', '_', '.', '@', '-', and may end with '$'.
+// UserRegExp to validate a username. Systems allow different things POSIX is
the base, but we need to be lenient and allow more.
+// A username must start with a letter(uppercase or lowercase), followed by
any number of letter(uppercase or lowercase),
+// digits, ':', '#', '/', '_', '.', '@', '-', and may end with '$'.
+// This supports OIDC compliant names.
var UserRegExp = regexp.MustCompile(`^[_a-zA-Z][a-zA-Z0-9:#/_.@-]*[$]?$`)
-// Groups should have a slightly more restrictive regexp (no # / @ or $ at the
end)
+// GroupRegExp is similar to UserRegExp, groups have a slightly more
restrictive regexp (no # / @ or $ at the end)
var GroupRegExp = regexp.MustCompile(`^[_a-zA-Z][a-zA-Z0-9:_.-]*$`)
-// all characters that make a name different from a regexp
+// SpecialRegExp matches all characters that make a name different from a
regexp
var SpecialRegExp = regexp.MustCompile(`[\^$*+?()\[{}|]`)
-// The rule maps to a go identifier check that regexp only
+// RuleNameRegExp as rule names must map to a go identifier check that regexp
only
var RuleNameRegExp = regexp.MustCompile(`^[_a-zA-Z][a-zA-Z0-9_]*$`)
-// Minimum Quota change preemption delay
-var minQuotaChangePreemptionDelay uint64 = 60
+type placementPathCheckResult int
type placementStaticPath struct {
path string
@@ -668,9 +674,6 @@ func checkQueues(queue *QueueConfig, level int) error {
return fmt.Errorf("duplicate child name found with name
'%s', level %d", child.Name, level)
}
queueMap[strings.ToLower(child.Name)] = true
- if queue.Preemption.Delay != 0 && queue.Preemption.Delay <=
minQuotaChangePreemptionDelay {
- return fmt.Errorf("invalid preemption delay %d, must be
between %d and %d", queue.Preemption.Delay, minQuotaChangePreemptionDelay,
uint64(math.MaxUint64))
- }
}
// recurse into the depth if this level passed
diff --git a/pkg/common/configs/configvalidator_test.go
b/pkg/common/configs/configvalidator_test.go
index 9b014c22..4d0c6495 100644
--- a/pkg/common/configs/configvalidator_test.go
+++ b/pkg/common/configs/configvalidator_test.go
@@ -2327,34 +2327,6 @@ func TestCheckQueues(t *testing.T) { //nolint:funlen
assert.Equal(t, 2, len(q.Queues), "Expected two
queues")
},
},
- {
- name: "Invalid Preemption delay for leaf queue",
- queue: &QueueConfig{
- Name: "root",
- Queues: []QueueConfig{
- {Name: "leaf",
- Preemption: Preemption{
- Delay: 10,
- },
- },
- },
- Preemption: Preemption{
- Delay: 10,
- },
- },
- level: 1,
- expectedErrorMsg: "invalid preemption delay 10, must be
between 60 and 18446744073709551615",
- },
- {
- name: "Setting Preemption delay on root queue would be
ignored",
- queue: &QueueConfig{
- Name: "root",
- Preemption: Preemption{
- Delay: 10,
- },
- },
- level: 0,
- },
}
for _, tc := range testCases {
diff --git a/pkg/scheduler/objects/preemption_utilities_test.go
b/pkg/scheduler/objects/preemption_utilities_test.go
index 0d30e79c..b63c85b3 100644
--- a/pkg/scheduler/objects/preemption_utilities_test.go
+++ b/pkg/scheduler/objects/preemption_utilities_test.go
@@ -158,7 +158,7 @@ func resetQueue(queue *Queue) {
queue.maxResource = nil
queue.allocatedResource = nil
queue.guaranteedResource = nil
- queue.isQuotaChangePreemptionRunning = false
+ queue.isQuotaPreemptionRunning = false
queue.preemptingResource = nil
}
diff --git a/pkg/scheduler/objects/queue.go b/pkg/scheduler/objects/queue.go
index f8044c24..18342434 100644
--- a/pkg/scheduler/objects/queue.go
+++ b/pkg/scheduler/objects/queue.go
@@ -54,46 +54,46 @@ type Queue struct {
Name string // Queue name as in the config etc.
// Private fields need protection
- sortType policies.SortPolicy // How applications
(leaf) or queues (parents) are sorted
- children map[string]*Queue // Only for direct
children, parent queue only
- childPriorities map[string]int32 // cached priorities for
child queues
- applications map[string]*Application // only for leaf queue
- appPriorities map[string]int32 // cached priorities for
application
- reservedApps map[string]int // applications reserved
within this queue, with reservation count
- parent *Queue // link back to the
parent in the scheduler
- pending *resources.Resource // pending resource for
the apps in the queue
- allocatedResource *resources.Resource // allocated resource for
the apps in the queue
- preemptingResource *resources.Resource // preempting resource
for the apps in the queue
- prioritySortEnabled bool // whether priority is
used for request sorting
- priorityPolicy policies.PriorityPolicy // priority policy
- priorityOffset int32 // priority offset for
this queue relative to others
- preemptionPolicy policies.PreemptionPolicy // preemption policy
- preemptionDelay time.Duration // time before preemption
is considered
- currentPriority int32 // the current scheduling
priority of this queue
+ sortType policies.SortPolicy // How applications
(leaf) or queues (parents) are sorted
+ children map[string]*Queue // Only for direct
children, parent queue only
+ childPriorities map[string]int32 // cached priorities for
child queues
+ applications map[string]*Application // only for leaf queue
+ appPriorities map[string]int32 // cached priorities for
application
+ reservedApps map[string]int // applications reserved
within this queue, with reservation count
+ parent *Queue // link back to the
parent in the scheduler
+ pending *resources.Resource // pending resource for
the apps in the queue
+ allocatedResource *resources.Resource // allocated resource
for the apps in the queue
+ preemptingResource *resources.Resource // preempting resource
for the apps in the queue
+ prioritySortEnabled bool // whether priority is
used for request sorting
+ priorityPolicy policies.PriorityPolicy // priority policy
+ priorityOffset int32 // priority offset for
this queue relative to others
+ preemptionPolicy policies.PreemptionPolicy // preemption policy
+ preemptionDelay time.Duration // time delay before
preemption is considered
+ quotaPreemptionDelay time.Duration // time delay before
quota preemption is considered
+ currentPriority int32 // the current
scheduling priority of this queue
// The queue properties should be treated as immutable the value is a
merge of the
// parent properties with the config for this queue only manipulated
during creation
// of the queue or via a queue configuration update.
- properties map[string]string
- adminACL security.ACL // admin ACL
- submitACL security.ACL // submit ACL
- maxResource *resources.Resource // When not set, max
= nil
- guaranteedResource *resources.Resource // When not set,
Guaranteed == 0
- isLeaf bool // this is a leaf
queue or not (i.e. parent)
- isManaged bool // queue is part of
the config, not auto created
- stateMachine *fsm.FSM // the state of the
queue for scheduling
- stateTime time.Time // last time the
state was updated (needed for cleanup)
- maxRunningApps uint64
- runningApps uint64
- allocatingAcceptedApps map[string]bool
- template *template.Template
- queueEvents *schedEvt.QueueEvents
- appQueueMapping *AppQueueMapping // appID mapping to
queues
- quotaChangePreemptionDelay uint64
- quotaChangePreemptionStartTime time.Time
- isQuotaChangePreemptionRunning bool
- unschedAskBackoff uint64
- askBackoffDelay time.Duration
+ properties map[string]string
+ adminACL security.ACL // admin ACL
+ submitACL security.ACL // submit ACL
+ maxResource *resources.Resource // When not set, max = nil
+ guaranteedResource *resources.Resource // When not set,
Guaranteed == 0
+ isLeaf bool // this is a leaf queue or
not (i.e. parent)
+ isManaged bool // queue is part of the
config, not auto created
+ stateMachine *fsm.FSM // the state of the queue
for scheduling
+ stateTime time.Time // last time the state was
updated (needed for cleanup)
+ maxRunningApps uint64
+ runningApps uint64
+ allocatingAcceptedApps map[string]bool
+ template *template.Template
+ queueEvents *schedEvt.QueueEvents
+ appQueueMapping *AppQueueMapping // appID mapping to queues
+ quotaPreemptionStartTime time.Time
+ isQuotaPreemptionRunning bool
+ unschedAskBackoff uint64
+ askBackoffDelay time.Duration
locking.RWMutex
}
@@ -101,24 +101,24 @@ type Queue struct {
// newBlankQueue creates a new empty queue objects with all values initialised.
func newBlankQueue() *Queue {
return &Queue{
- children: make(map[string]*Queue),
- childPriorities: make(map[string]int32),
- applications: make(map[string]*Application),
- appPriorities: make(map[string]int32),
- reservedApps: make(map[string]int),
- allocatingAcceptedApps: make(map[string]bool),
- properties: make(map[string]string),
- stateMachine: NewObjectState(),
- allocatedResource: resources.NewResource(),
- preemptingResource: resources.NewResource(),
- pending: resources.NewResource(),
- currentPriority: configs.MinPriority,
- prioritySortEnabled: true,
- preemptionDelay: configs.DefaultPreemptionDelay,
- preemptionPolicy:
policies.DefaultPreemptionPolicy,
- quotaChangePreemptionDelay: 0,
- quotaChangePreemptionStartTime: time.Time{},
- askBackoffDelay: configs.DefaultAskBackOffDelay,
+ children: make(map[string]*Queue),
+ childPriorities: make(map[string]int32),
+ applications: make(map[string]*Application),
+ appPriorities: make(map[string]int32),
+ reservedApps: make(map[string]int),
+ allocatingAcceptedApps: make(map[string]bool),
+ properties: make(map[string]string),
+ stateMachine: NewObjectState(),
+ allocatedResource: resources.NewResource(),
+ preemptingResource: resources.NewResource(),
+ pending: resources.NewResource(),
+ currentPriority: configs.MinPriority,
+ prioritySortEnabled: true,
+ preemptionDelay: configs.DefaultPreemptionDelay,
+ preemptionPolicy: policies.DefaultPreemptionPolicy,
+ quotaPreemptionDelay: 0,
+ quotaPreemptionStartTime: time.Time{},
+ askBackoffDelay: configs.DefaultAskBackOffDelay,
}
}
@@ -139,7 +139,7 @@ func NewConfiguredQueue(conf configs.QueueConfig, parent
*Queue, silence bool, a
sq.updateMaxRunningAppsMetrics()
// update the properties
- if err := sq.applyConf(conf, silence); err != nil {
+ if _, err := sq.applyConf(conf, silence); err != nil {
return nil, errors.Join(errors.New("configured queue creation
failed: "), err)
}
@@ -148,13 +148,13 @@ func NewConfiguredQueue(conf configs.QueueConfig, parent
*Queue, silence bool, a
if parent != nil {
// pull the properties from the parent that should be set on
the child
sq.mergeProperties(parent.getProperties(), conf.Properties)
- sq.UpdateQueueProperties()
+ sq.UpdateQueueProperties(nil)
err := parent.addChildQueue(sq)
if err != nil {
return nil, errors.Join(errors.New("configured queue
creation failed: "), err)
}
} else {
- sq.UpdateQueueProperties()
+ sq.UpdateQueueProperties(nil)
}
if !silence {
@@ -219,7 +219,7 @@ func newDynamicQueueInternal(name string, leaf bool, parent
*Queue, appQueueMapp
return nil, errors.Join(errors.New("dynamic queue creation
failed: "), err)
}
- sq.UpdateQueueProperties()
+ sq.UpdateQueueProperties(nil)
sq.queueEvents = schedEvt.NewQueueEvents(events.GetEventSystem())
log.Log(log.SchedQueue).Info("dynamic queue added to scheduler",
zap.String("queueName", sq.QueuePath))
@@ -272,13 +272,13 @@ func (sq *Queue) mergeProperties(parent, config
map[string]string) {
}
}
-func preemptionDelay(value string) (time.Duration, error) {
+func convertDelay(value string, def time.Duration) (time.Duration, error) {
result, err := time.ParseDuration(value)
if err != nil {
- return configs.DefaultPreemptionDelay, err
+ return def, err
}
if int64(result) <= int64(0) {
- return configs.DefaultPreemptionDelay, fmt.Errorf("%s must be
positive: %s", configs.PreemptionDelay, value)
+ return def, fmt.Errorf("delay must be positive value: %s",
value)
}
return result, nil
}
@@ -328,19 +328,8 @@ func unschedulableAskBackoff(value string) (uint64, error)
{
return intValue, nil
}
-func backoffDelay(value string) (time.Duration, error) {
- result, err := time.ParseDuration(value)
- if err != nil {
- return configs.DefaultAskBackOffDelay, err
- }
- if int64(result) <= int64(0) {
- return configs.DefaultAskBackOffDelay, fmt.Errorf("%s must be
positive: %s", configs.ApplicationUnschedulableAsksBackoffDelay, value)
- }
- return result, nil
-}
-
// ApplyConf is the locked version of applyConf
-func (sq *Queue) ApplyConf(conf configs.QueueConfig) error {
+func (sq *Queue) ApplyConf(conf configs.QueueConfig) (*resources.Resource,
error) {
sq.Lock()
defer sq.Unlock()
return sq.applyConf(conf, false)
@@ -349,20 +338,21 @@ func (sq *Queue) ApplyConf(conf configs.QueueConfig)
error {
// applyConf applies all the properties to the queue from the config.
// lock free call, must be called holding the queue lock or during create only.
// If the silence flag is set to true, the function will not log when setting
users and groups.
-func (sq *Queue) applyConf(conf configs.QueueConfig, silence bool) error {
+// This function MUST be called holding the lock for the queue.
+func (sq *Queue) applyConf(conf configs.QueueConfig, silence bool)
(*resources.Resource, error) {
// Set the ACLs
var err error
sq.submitACL, err = security.NewACL(conf.SubmitACL, silence)
if err != nil {
log.Log(log.SchedQueue).Error("parsing submit ACL failed this
should not happen",
zap.Error(err))
- return err
+ return nil, err
}
sq.adminACL, err = security.NewACL(conf.AdminACL, silence)
if err != nil {
log.Log(log.SchedQueue).Error("parsing admin ACL failed this
should not happen",
zap.Error(err))
- return err
+ return nil, err
}
// Change from unmanaged to managed
if !sq.isManaged {
@@ -392,85 +382,129 @@ func (sq *Queue) applyConf(conf configs.QueueConfig,
silence bool) error {
if !sq.isLeaf {
if err = sq.setTemplate(conf.ChildTemplate); err != nil {
- return err
+ return nil, err
}
}
+ oldMaxResource := sq.maxResource
// Load the max & guaranteed resources and maxApps for all but the root
queue
if sq.Name != configs.RootQueue {
- oldMaxResource := sq.maxResource
if err = sq.setResourcesFromConf(conf.Resources); err != nil {
- return err
+ return nil, err
}
sq.maxRunningApps = conf.MaxApplications
sq.updateMaxRunningAppsMetrics()
- sq.setPreemptionSettings(oldMaxResource, conf)
}
-
sq.properties = conf.Properties
- return nil
+ return oldMaxResource, nil
}
-// setPreemptionSettings Set Quota change preemption settings
-func (sq *Queue) setPreemptionSettings(oldMaxResource *resources.Resource,
conf configs.QueueConfig) {
- newMaxResource, err := resources.NewResourceFromConf(conf.Resources.Max)
- if err != nil {
- log.Log(log.SchedQueue).Error("parsing failed on max resources
this should not happen",
- zap.String("queue", sq.QueuePath),
- zap.Error(err))
+// setPreemptionTime set the time the quota preemption should be triggered
when a quota is changed.
+// Updates the time if the delay is set / changes and the preemption is not
running yet.
+// This function MUST be called holding the lock for the queue.
+func (sq *Queue) setPreemptionTime(oldMaxResource *resources.Resource,
oldDelay time.Duration) {
+ // if quota preemption is running nothing we do not have an influence
+ // if the delay for the queue is not set do not trigger preemption
+ if sq.isQuotaPreemptionRunning || sq.quotaPreemptionDelay == 0 {
return
}
-
- switch {
- // Set max res earlier but not now
- case resources.IsZero(newMaxResource) &&
!resources.IsZero(oldMaxResource):
- sq.quotaChangePreemptionDelay = 0
- sq.quotaChangePreemptionStartTime = time.Time{}
- // Set max res now but not earlier
- case !resources.IsZero(newMaxResource) &&
resources.IsZero(oldMaxResource) && conf.Preemption.Delay != 0:
- sq.quotaChangePreemptionDelay = conf.Preemption.Delay
- sq.quotaChangePreemptionStartTime =
time.Now().Add(time.Duration(int64(sq.quotaChangePreemptionDelay)) *
time.Second) //nolint:gosec
- // Set max res earlier and now as well
- default:
- switch {
- // Quota decrease
- case resources.StrictlyGreaterThan(oldMaxResource,
newMaxResource) && conf.Preemption.Delay != 0:
- sq.quotaChangePreemptionDelay = conf.Preemption.Delay
- sq.quotaChangePreemptionStartTime =
time.Now().Add(time.Duration(sq.quotaChangePreemptionDelay) * time.Second)
//nolint:gosec
- // Quota increase
- case resources.StrictlyGreaterThan(newMaxResource,
oldMaxResource) && conf.Preemption.Delay != 0:
- sq.quotaChangePreemptionDelay = 0
- sq.quotaChangePreemptionStartTime = time.Time{}
- // Quota remains as is but delay has changed
- case resources.Equals(oldMaxResource, newMaxResource) &&
conf.Preemption.Delay != 0 && sq.quotaChangePreemptionDelay !=
conf.Preemption.Delay:
- sq.quotaChangePreemptionDelay = conf.Preemption.Delay
- sq.quotaChangePreemptionStartTime =
time.Now().Add(time.Duration(sq.quotaChangePreemptionDelay) * time.Second)
//nolint:gosec
- default:
- // noop
+ // if no current limit we should not preempt even if it was set
earlier, clear the start time
+ if resources.IsZero(sq.maxResource) {
+ sq.quotaPreemptionStartTime = time.Time{}
+ log.Log(log.SchedQueue).Info("removed quota preemption start
time",
+ zap.String("queue", sq.QueuePath))
+ return
+ }
+ // adjust the startup based on the diff between the delays if no change
+ if resources.Equals(oldMaxResource, sq.maxResource) {
+ if sq.quotaPreemptionStartTime.IsZero() {
+ // if a delay is set later than the quota change we
would like to trigger preemption even if no change for
+ // max resources is detected. preemption trigger will
clean up if usage is below max already.
+ if oldDelay == 0 && sq.quotaPreemptionDelay > 0 {
+ sq.quotaPreemptionStartTime =
time.Now().Add(sq.quotaPreemptionDelay)
+ log.Log(log.SchedQueue).Info("set quota
preemption start time based on delay change",
+ zap.String("queue", sq.QueuePath),
+ zap.Time("quotaPreemptionStartTime",
sq.quotaPreemptionStartTime))
+ }
+ } else {
+ if oldDelay != sq.quotaPreemptionDelay {
+ // apply the delta: this can be positive or
negative does not matter. The resulting time could be in the
+ // past: also not an issue.
+ sq.quotaPreemptionStartTime =
sq.quotaPreemptionStartTime.Add(sq.quotaPreemptionDelay - oldDelay)
+ log.Log(log.SchedQueue).Info("updated quota
preemption start time based on delay change",
+ zap.String("queue", sq.QueuePath),
+ zap.Time("quotaPreemptionStartTime",
sq.quotaPreemptionStartTime),
+ zap.Duration("delta",
sq.quotaPreemptionDelay-oldDelay))
+ }
}
+ return
+ }
+ // quota has been lowered, need to set the start time.
+ if resources.StrictlyGreaterThan(oldMaxResource, sq.maxResource) {
+ if !sq.quotaPreemptionStartTime.IsZero() {
+ // multiple consecutive lowering of quotas detected:
check for delay change
+ if oldDelay != sq.quotaPreemptionDelay {
+ sq.quotaPreemptionStartTime =
sq.quotaPreemptionStartTime.Add(sq.quotaPreemptionDelay - oldDelay)
+ log.Log(log.SchedQueue).Info("consecutive
lowering of quota with delay change",
+ zap.String("queue", sq.QueuePath),
+ zap.Time("quotaPreemptionStartTime",
sq.quotaPreemptionStartTime),
+ zap.Duration("delta",
sq.quotaPreemptionDelay-oldDelay))
+ return
+ }
+ log.Log(log.SchedQueue).Info("consecutive lowering of
quota: preemption time already set from earlier quota change",
+ zap.String("queue", sq.QueuePath),
+ zap.Time("quotaPreemptionStartTime",
sq.quotaPreemptionStartTime))
+ return
+ }
+ sq.quotaPreemptionStartTime =
time.Now().Add(sq.quotaPreemptionDelay)
+ log.Log(log.SchedQueue).Info("Setting quota preemption time",
+ zap.String("queue", sq.QueuePath),
+ zap.Stringer("old maxResource", oldMaxResource),
+ zap.Stringer("maxResources", sq.maxResource),
+ zap.Duration("delay", sq.quotaPreemptionDelay),
+ zap.Time("quotaPreemptionStartTime",
sq.quotaPreemptionStartTime))
}
}
-// resetPreemptionSettings Reset Quota change preemption settings
-func (sq *Queue) resetPreemptionSettings() {
- sq.Lock()
- defer sq.Unlock()
- sq.quotaChangePreemptionDelay = 0
- sq.quotaChangePreemptionStartTime = time.Time{}
-}
-
-// shouldTriggerPreemption Should preemption be triggered or not to enforce
new max quota?
+// shouldTriggerPreemption returns true if quota preemption should be
triggered based on the settings and the
+// current time.
func (sq *Queue) shouldTriggerPreemption() bool {
sq.RLock()
defer sq.RUnlock()
- return sq.quotaChangePreemptionDelay != 0 &&
!sq.quotaChangePreemptionStartTime.IsZero() &&
time.Now().After(sq.quotaChangePreemptionStartTime)
+ // dynamic queues do not support quota preemption
+ if !sq.isManaged {
+ return false
+ }
+ // already in progress do not run again
+ if sq.isQuotaPreemptionRunning {
+ return false
+ }
+ // usage is below max: no need to trigger. Happens if the queue drops
below the new max when pods stop.
+ // Should clean up, but we have just a read lock...
+ if
sq.maxResource.StrictlyGreaterThanOrEqualsOnlyExisting(sq.allocatedResource) {
+ sq.quotaPreemptionStartTime = time.Time{}
+ return false
+ }
+ // trigger if the time is right
+ return !sq.quotaPreemptionStartTime.IsZero() &&
time.Now().After(sq.quotaPreemptionStartTime)
}
-// getPreemptionSettings Get preemption settings. Only for testing
-func (sq *Queue) getPreemptionSettings() (uint64, time.Time) {
+// setQuotaPreemptionState set or clear the running state for quota
preemption. When done the start time is also cleared
+// out preventing a re-run based on the same change
+func (sq *Queue) setQuotaPreemptionState(run bool) {
+ sq.Lock()
+ defer sq.Unlock()
+ if !run {
+ sq.quotaPreemptionStartTime = time.Time{}
+ }
+ sq.isQuotaPreemptionRunning = run
+}
+
+// getQuotaPreemptionRunning returns the running state for quota preemption.
+func (sq *Queue) getQuotaPreemptionRunning() bool {
sq.RLock()
defer sq.RUnlock()
- return sq.quotaChangePreemptionDelay, sq.quotaChangePreemptionStartTime
+ return sq.isQuotaPreemptionRunning
}
// setResourcesFromConf sets the maxResource and guaranteedResource of the
queue from the config.
@@ -578,7 +612,7 @@ func (sq *Queue) setTemplate(conf configs.ChildTemplate)
error {
}
// UpdateQueueProperties updates the queue properties defined as text
-func (sq *Queue) UpdateQueueProperties() {
+func (sq *Queue) UpdateQueueProperties(oldMaxResource *resources.Resource) {
sq.Lock()
defer sq.Unlock()
if common.IsRecoveryQueue(sq.QueuePath) {
@@ -632,26 +666,32 @@ func (sq *Queue) UpdateQueueProperties() {
}
case configs.PreemptionDelay:
if sq.isLeaf {
- sq.preemptionDelay, err = preemptionDelay(value)
+ sq.preemptionDelay, err = convertDelay(value,
configs.DefaultPreemptionDelay)
if err != nil {
log.Log(log.SchedQueue).Debug("preemption delay property configuration error",
zap.Error(err))
}
}
case configs.ApplicationUnschedulableAsksBackoff:
- unschedAskBackoff, err := unschedulableAskBackoff(value)
+ sq.unschedAskBackoff, err =
unschedulableAskBackoff(value)
if err != nil {
log.Log(log.SchedQueue).Debug("unschedulable
ask backoff configuration error",
zap.Error(err))
}
- sq.unschedAskBackoff = unschedAskBackoff
case configs.ApplicationUnschedulableAsksBackoffDelay:
- askBackoffDelay, err := backoffDelay(value)
+ sq.askBackoffDelay, err = convertDelay(value,
configs.DefaultAskBackOffDelay)
if err != nil {
log.Log(log.SchedQueue).Debug("unschedulable
ask backoff delay configuration error",
zap.Error(err))
}
- sq.askBackoffDelay = askBackoffDelay
+ case configs.QuotaPreemptionDelay:
+ oldDelay := sq.quotaPreemptionDelay
+ sq.quotaPreemptionDelay, err = convertDelay(value,
configs.DefaultQuotaPreemptionDelay)
+ if err != nil {
+ log.Log(log.SchedQueue).Debug("quota preemption
delay configuration error",
+ zap.Error(err))
+ }
+ sq.setPreemptionTime(oldMaxResource, oldDelay)
default:
// skip unknown properties just log them
log.Log(log.SchedQueue).Debug("queue property skipped",
@@ -725,7 +765,7 @@ func (sq *Queue) GetPreemptingResource()
*resources.Resource {
func (sq *Queue) GetGuaranteedResource() *resources.Resource {
sq.RLock()
defer sq.RUnlock()
- return sq.guaranteedResource
+ return sq.guaranteedResource.Clone()
}
// GetMaxApps returns the maximum number of applications that can run in this
queue.
@@ -735,7 +775,7 @@ func (sq *Queue) GetMaxApps() uint64 {
return sq.maxRunningApps
}
-// GetActualGuaranteedResources returns the actual (including parent)
guaranteed resources for the queue.
+// GetActualGuaranteedResource returns the actual (including parent)
guaranteed resources for the queue.
func (sq *Queue) GetActualGuaranteedResource() *resources.Resource {
if sq == nil {
return resources.NewResource()
@@ -817,6 +857,7 @@ func (sq *Queue) GetPartitionQueueDAOInfo(include bool)
dao.PartitionQueueDAOInf
queueInfo.PreemptionEnabled = sq.preemptionPolicy !=
policies.DisabledPreemptionPolicy
queueInfo.IsPreemptionFence = sq.preemptionPolicy ==
policies.FencePreemptionPolicy
queueInfo.PreemptionDelay = sq.preemptionDelay.String()
+ queueInfo.QuotaPreemptionDelay = sq.quotaPreemptionDelay.String()
queueInfo.IsPriorityFence = sq.priorityPolicy ==
policies.FencePriorityPolicy
queueInfo.PriorityOffset = sq.priorityOffset
queueInfo.Properties = make(map[string]string)
@@ -1402,7 +1443,7 @@ func (sq *Queue) GetMaxResource() *resources.Resource {
return sq.internalGetMax(limit)
}
-func (sq *Queue) CloneMaxResource() *resources.Resource {
+func (sq *Queue) cloneMaxResource() *resources.Resource {
sq.RLock()
defer sq.RUnlock()
return sq.maxResource.Clone()
@@ -1540,7 +1581,25 @@ func (sq *Queue) canRunApp(appID string) bool {
// resources are skipped.
// Applications are sorted based on the application sortPolicy. Applications
without pending resources are skipped.
// Lock free call this all locks are taken when needed in called functions
-func (sq *Queue) TryAllocate(iterator func() NodeIterator, fullIterator func()
NodeIterator, getnode func(string) *Node, allowPreemption bool)
*AllocationResult {
+func (sq *Queue) TryAllocate(iterator func() NodeIterator, fullIterator func()
NodeIterator, getnode func(string) *Node, allowPreemption, quotaPreemption
bool) *AllocationResult {
+ if quotaPreemption && sq.shouldTriggerPreemption() {
+ go func() {
+ log.Log(log.SchedQueue).Info("Preconditions has passed
trigger preemption to enforce new max resources",
+ zap.String("queueName", sq.GetQueuePath()),
+ zap.Stringer("maxResource",
sq.cloneMaxResource()))
+ preemptor := NewQuotaPreemptor(sq)
+ preemptor.tryPreemption()
+ }()
+ // when we trigger for a parent queue do not trigger for the
children just yet. At least wait until the next
+ // scheduling cycle.
+ quotaPreemption = false
+ }
+ // if quota preemption is running for this queue we do not want to
trigger for any of the children.
+ // we do a top-down approach: parent first and when done we check the
children
+ // there could be a child quota preemption running already
+ if sq.getQuotaPreemptionRunning() {
+ quotaPreemption = false
+ }
if sq.IsLeafQueue() {
// get the headroom
headRoom := sq.getHeadRoom()
@@ -1574,26 +1633,10 @@ func (sq *Queue) TryAllocate(iterator func()
NodeIterator, fullIterator func() N
return result
}
}
-
- // Should we trigger preemption to enforce new quota?
- if sq.shouldTriggerPreemption() {
- go func() {
- log.Log(log.SchedQueue).Info("Trigger
preemption to enforce new max resources",
- zap.String("queueName", sq.QueuePath),
- zap.String("max resources",
sq.maxResource.String()))
- preemptor := NewQuotaChangePreemptor(sq)
- if preemptor.CheckPreconditions() {
-
log.Log(log.SchedQueue).Info("Preconditions has passed to trigger preemption to
enforce new max resources",
- zap.String("queueName",
sq.QueuePath),
- zap.String("max resources",
sq.maxResource.String()))
- preemptor.tryPreemption()
- }
- }()
- }
} else {
// process the child queues (filters out queues without pending
requests)
for _, child := range sq.sortQueues() {
- result := child.TryAllocate(iterator, fullIterator,
getnode, allowPreemption)
+ result := child.TryAllocate(iterator, fullIterator,
getnode, allowPreemption, quotaPreemption)
if result != nil {
return result
}
@@ -2190,47 +2233,6 @@ func (sq *Queue) recalculatePriority() int32 {
return priorityValueByPolicy(sq.priorityPolicy, sq.priorityOffset, curr)
}
-func (sq *Queue) MarkQuotaChangePreemptionRunning(run bool) {
- sq.Lock()
- defer sq.Unlock()
- sq.isQuotaChangePreemptionRunning = run
-}
-
-func (sq *Queue) isQCPreemptionRunningForParent() bool {
- if sq == nil {
- return false
- }
- if sq.parent != nil {
- if sq.parent.isQCPreemptionRunningForParent() {
- return true
- }
- }
- sq.RLock()
- defer sq.RUnlock()
- return sq.isQuotaChangePreemptionRunning
-}
-
-func (sq *Queue) isQCPreemptionRunningForChild() bool {
- for _, child := range sq.GetCopyOfChildren() {
- if child.isQCPreemptionRunningForChild() {
- return true
- }
- }
- sq.RLock()
- defer sq.RUnlock()
- return sq.isQuotaChangePreemptionRunning
-}
-
-func (sq *Queue) IsQCPreemptionRunning() bool {
- if sq.isQCPreemptionRunningForParent() {
- return true
- }
- if sq.isQCPreemptionRunningForChild() {
- return true
- }
- return false
-}
-
func (sq *Queue) GetMaxAppUnschedAskBackoff() uint64 {
sq.RLock()
defer sq.RUnlock()
diff --git a/pkg/scheduler/objects/queue_test.go
b/pkg/scheduler/objects/queue_test.go
index 4308749a..399e8581 100644
--- a/pkg/scheduler/objects/queue_test.go
+++ b/pkg/scheduler/objects/queue_test.go
@@ -43,8 +43,6 @@ import (
"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
)
-const ZeroResource string = "{\"resources\":{\"first\":{\"value\":0}}}"
-
// base test for creating a managed queue
func TestQueueBasics(t *testing.T) {
// create the root
@@ -1794,7 +1792,7 @@ func TestGetPartitionQueueDAOInfo(t *testing.T) {
configs.PreemptionDelay: "3600s",
configs.PreemptionPolicy:
policies.FencePreemptionPolicy.String(),
}
- leaf.UpdateQueueProperties()
+ leaf.UpdateQueueProperties(nil)
leafDAO := leaf.GetPartitionQueueDAOInfo(false)
assert.Equal(t, leafDAO.QueueName, "root.leaf-queue")
assert.Equal(t, len(leafDAO.Children), 0, "leaf has no children")
@@ -1810,7 +1808,7 @@ func TestGetPartitionQueueDAOInfo(t *testing.T) {
configs.PreemptionDelay: "10s",
configs.PreemptionPolicy:
policies.DisabledPreemptionPolicy.String(),
}
- leaf.UpdateQueueProperties()
+ leaf.UpdateQueueProperties(nil)
leafDAO = leaf.GetPartitionQueueDAOInfo(false)
assert.Equal(t, leafDAO.PreemptionEnabled, false, "preemption should
not be enabled")
assert.Equal(t, leafDAO.IsPreemptionFence, false, "queue should not be
a fence")
@@ -2230,14 +2228,14 @@ func TestApplyConf(t *testing.T) {
errConf1 := configs.QueueConfig{
SubmitACL: "error Submit ACL",
}
- err = errQueue.ApplyConf(errConf1)
+ _, err = errQueue.ApplyConf(errConf1)
assert.ErrorContains(t, err, "multiple spaces found in ACL")
// wrong AdminACL
errConf2 := configs.QueueConfig{
AdminACL: "error Admin ACL",
}
- err = errQueue.ApplyConf(errConf2)
+ _, err = errQueue.ApplyConf(errConf2)
assert.ErrorContains(t, err, "multiple spaces found in ACL")
// wrong ChildTemplate
@@ -2249,7 +2247,7 @@ func TestApplyConf(t *testing.T) {
},
},
}
- err = errQueue.ApplyConf(errConf3)
+ _, err = errQueue.ApplyConf(errConf3)
assert.ErrorContains(t, err, "invalid quantity")
// isManaged is changed from unmanaged to managed
@@ -2261,7 +2259,7 @@ func TestApplyConf(t *testing.T) {
child, err := NewDynamicQueue("child", true, parent, nil)
assert.NilError(t, err, "failed to create basic queue: %v", err)
- err = child.ApplyConf(childConf)
+ _, err = child.ApplyConf(childConf)
assert.NilError(t, err, "failed to parse conf: %v", err)
assert.Equal(t, child.IsManaged(), true)
@@ -2278,7 +2276,7 @@ func TestApplyConf(t *testing.T) {
parent, err = createManagedQueueWithProps(nil, "parent", false, nil,
nil)
assert.NilError(t, err, "failed to create basic queue: %v", err)
- err = parent.ApplyConf(parentConf)
+ _, err = parent.ApplyConf(parentConf)
assert.NilError(t, err, "failed to parse conf: %v", err)
assert.Equal(t, parent.IsLeafQueue(), false)
@@ -2308,7 +2306,7 @@ func TestApplyConf(t *testing.T) {
assert.Assert(t, validTemplate != nil)
conf.Parent = false
- err = leaf.ApplyConf(conf)
+ _, err = leaf.ApplyConf(conf)
assert.NilError(t, err, "failed to apply conf: %v", err)
assert.Assert(t, leaf.template == nil)
assert.Assert(t, leaf.maxResource != nil)
@@ -2320,7 +2318,7 @@ func TestApplyConf(t *testing.T) {
assert.NilError(t, err, "failed to create basic queue: %v", err)
conf.Parent = true
- err = queue.ApplyConf(conf)
+ _, err = queue.ApplyConf(conf)
assert.NilError(t, err, "failed to apply conf: %v", err)
assert.Assert(t, queue.template != nil)
assert.Assert(t, queue.maxResource != nil)
@@ -2331,142 +2329,193 @@ func TestApplyConf(t *testing.T) {
root, err := createManagedQueueWithProps(nil, "root", true, nil, nil)
assert.NilError(t, err, "failed to create basic queue: %v", err)
- err = queue.ApplyConf(conf)
+ _, err = queue.ApplyConf(conf)
assert.NilError(t, err, "failed to apply conf: %v", err)
assert.Assert(t, root.maxResource == nil)
assert.Assert(t, root.guaranteedResource == nil)
assert.Equal(t, root.maxRunningApps, uint64(0))
}
-func TestQuotaChangePreemptionSettings(t *testing.T) {
- root, err := createManagedQueueWithProps(nil, "root", true, nil, nil)
+func TestQuotaPreemptionSettings(t *testing.T) {
+ root, err := createManagedQueue(nil, "root", true, nil)
assert.NilError(t, err, "failed to create basic queue: %v", err)
- parent, err := createManagedQueueWithProps(root, "parent", false, nil,
nil)
- assert.NilError(t, err, "failed to create basic queue: %v", err)
testCases := []struct {
- name string
- conf configs.QueueConfig
- expectedDelay uint64
- }{{"first time queue setup without delay", configs.QueueConfig{
- Resources: configs.Resources{
- Max: getResourceConf(),
- Guaranteed: getResourceConf(),
- },
- }, 0},
- {"clearing max resources", configs.QueueConfig{
- Resources: configs.Resources{
- Max: nil,
- Guaranteed: nil,
- },
- }, 0},
- {"first time queue setup with delay", configs.QueueConfig{
- Resources: configs.Resources{
- Max: getResourceConf(),
- Guaranteed: getResourceConf(),
- },
- Preemption: configs.Preemption{
- Delay: 1,
- },
- }, 1},
- {"increase max with delay", configs.QueueConfig{
- Resources: configs.Resources{
- Max: map[string]string{"memory": "100000000"},
- },
- Preemption: configs.Preemption{
- Delay: 500,
- },
- }, 0},
- {"decrease max with delay", configs.QueueConfig{
- Resources: configs.Resources{
- Max: map[string]string{"memory": "100"},
- },
- Preemption: configs.Preemption{
- Delay: 2,
- },
- }, 2},
- {"max remains as is but delay changed", configs.QueueConfig{
- Resources: configs.Resources{
- Max: map[string]string{"memory": "100"},
- },
- Preemption: configs.Preemption{
- Delay: 2,
- },
- }, 2},
- {"unrelated config change, should not impact earlier set
preemption settings", configs.QueueConfig{
- Resources: configs.Resources{
- Max: map[string]string{"memory": "100"},
- Guaranteed: map[string]string{"memory": "50"},
- },
- Preemption: configs.Preemption{
- Delay: 2,
- },
- }, 2},
- {"increase max again with delay", configs.QueueConfig{
- Resources: configs.Resources{
- Max: map[string]string{"memory": "101"},
- },
- Preemption: configs.Preemption{
- Delay: 200,
- },
- }, 0}}
-
+ name string
+ maxRes map[string]string
+ conf configs.QueueConfig
+ oldDelay time.Duration
+ timeChange bool
+ }{
+ {"clearing max",
+ map[string]string{"memory": "500"},
+ configs.QueueConfig{
+ Resources: configs.Resources{
+ Max: nil,
+ },
+ }, 0, false},
+ {"clearing max with delay",
+ map[string]string{"memory": "500"},
+ configs.QueueConfig{
+ Resources: configs.Resources{
+ Max: nil,
+ },
+ Properties:
map[string]string{configs.QuotaPreemptionDelay: "50ms"},
+ }, 0, false},
+ {"incorrect delay",
+ map[string]string{"memory": "500"},
+ configs.QueueConfig{
+ Resources: configs.Resources{
+ Max: nil,
+ },
+ Properties:
map[string]string{configs.QuotaPreemptionDelay: "-50s"},
+ }, 0, false},
+ {"increase max with delay",
+ map[string]string{"memory": "500"},
+ configs.QueueConfig{
+ Resources: configs.Resources{
+ Max: map[string]string{"memory":
"1000"},
+ },
+ Properties:
map[string]string{configs.QuotaPreemptionDelay: "50ms"},
+ }, 50 * time.Millisecond, false},
+ {"decrease max with delay",
+ map[string]string{"memory": "500"},
+ configs.QueueConfig{
+ Resources: configs.Resources{
+ Max: map[string]string{"memory": "100"},
+ },
+ Properties:
map[string]string{configs.QuotaPreemptionDelay: "50ms"},
+ }, 50 * time.Millisecond, true},
+ {"delay changed from 0 no max change",
+ map[string]string{"memory": "500"},
+ configs.QueueConfig{
+ Resources: configs.Resources{
+ Max: map[string]string{"memory": "500"},
+ },
+ Properties:
map[string]string{configs.QuotaPreemptionDelay: "50ms"},
+ }, 0, true},
+ }
+
+ var oldMax *resources.Resource
+ var parent *Queue
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
- err = parent.ApplyConf(tc.conf)
+ var expectedMax *resources.Resource
+ if tc.maxRes != nil {
+ expectedMax, err =
resources.NewResourceFromConf(tc.maxRes)
+ assert.NilError(t, err, "resource creation
failed")
+ }
+ parent, err = createManagedQueue(root, "parent", false,
tc.maxRes)
+ assert.NilError(t, err, "failed to create basic queue:
%v", err)
+ oldMax, err = parent.ApplyConf(tc.conf)
assert.NilError(t, err, "failed to apply conf: %v", err)
+ assert.Assert(t, resources.Equals(oldMax, expectedMax),
"old max not from setup")
- // assert the preemption settings
- delay, sTime := parent.getPreemptionSettings()
- assert.Equal(t, delay, tc.expectedDelay)
- if tc.expectedDelay != uint64(0) {
- assert.Equal(t, sTime.IsZero(), false)
- } else {
- assert.Equal(t, sTime.IsZero(), true)
- }
+ // apply properties and set a usage
+ parent.allocatedResource = resources.Multiply(oldMax, 2)
+ parent.quotaPreemptionDelay = tc.oldDelay
+ parent.UpdateQueueProperties(oldMax)
+ // Wait till delay expires to let trigger preemption
automatically
+ time.Sleep(parent.quotaPreemptionDelay +
50*time.Millisecond)
+ assert.Equal(t, parent.shouldTriggerPreemption(),
tc.timeChange, "preemption should get trigger for set delay")
+ parent.TryAllocate(nil, nil, nil, false, true)
+
+ time.Sleep(50 * time.Millisecond)
+
+ // since preemption settings are reset, preemption
should not be triggerred again during the next check
assert.Equal(t, parent.shouldTriggerPreemption(), false)
+ })
+ }
+}
- used, err :=
resources.NewResourceFromConf(tc.conf.Resources.Max)
- assert.NilError(t, err, "failed to set allocated
resource: %v", err)
- parent.allocatedResource = resources.Multiply(used, 2)
+func TestShouldTriggerPreemption(t *testing.T) {
+ parentConfig := configs.QueueConfig{
+ Name: "parent",
+ Parent: true,
+ Resources: configs.Resources{
+ Max: map[string]string{"memory": "1000"},
+ },
+ }
+ parent, err := NewConfiguredQueue(parentConfig, nil, false, nil)
+ assert.NilError(t, err)
+ parent.allocatedResource =
resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 2000,
"cpu": 2000})
+ parent.quotaPreemptionStartTime = time.Now()
- // Wait till delay expires to let trigger preemption
automatically
- time.Sleep(time.Duration(int64(tc.expectedDelay)+1) *
time.Second)
- if tc.expectedDelay != uint64(0) {
- assert.Equal(t,
parent.shouldTriggerPreemption(), true)
- }
- parent.TryAllocate(nil, nil, nil, false)
-
- // preemption settings should be same as before even
now as trigger is async process
- delay, sTime = parent.getPreemptionSettings()
- assert.Equal(t, delay, tc.expectedDelay)
- if tc.expectedDelay != uint64(0) {
- assert.Equal(t, sTime.IsZero(), false)
- assert.Equal(t,
parent.shouldTriggerPreemption(), true)
- } else {
- assert.Equal(t, sTime.IsZero(), true)
- }
+ leafRes := configs.Resources{
+ Max: map[string]string{"memory": "1000"},
+ }
+ leaf, err := NewConfiguredQueue(configs.QueueConfig{
+ Name: "leaf",
+ Resources: leafRes,
+ }, parent, false, nil)
+ assert.NilError(t, err)
+
+ dynamicLeaf, err := NewConfiguredQueue(configs.QueueConfig{
+ Name: "dynamic-leaf",
+ Resources: leafRes,
+ }, parent, false, nil)
+ assert.NilError(t, err)
+ dynamicLeaf.isManaged = false
- time.Sleep(time.Millisecond * 100)
+ alreadyPreemptionRunning, err := NewConfiguredQueue(configs.QueueConfig{
+ Name: "leaf-already-preemption-running",
+ Resources: leafRes,
+ }, parent, false, nil)
+ assert.NilError(t, err)
+ alreadyPreemptionRunning.setQuotaPreemptionState(true)
- // preemption should have been triggered by now, assert
preemption settings to ensure values are reset
- if tc.expectedDelay != uint64(0) {
- delay, sTime = parent.getPreemptionSettings()
- assert.Equal(t, sTime.IsZero(), true)
- assert.Equal(t, delay, uint64(0))
+ usageExceededMaxQueue, err := NewConfiguredQueue(configs.QueueConfig{
+ Name: "leaf-usage-exceeded-max",
+ Resources: leafRes,
+ }, parent, false, nil)
+ assert.NilError(t, err)
+ usageExceededMaxQueue.allocatedResource =
resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 2000,
"cpu": 2000})
- // since preemption settings are set,
preemption should not be triggerred again during tryAllocate
- assert.Equal(t,
parent.shouldTriggerPreemption(), false)
- }
+ usageEqualsMaxQueue, err := NewConfiguredQueue(configs.QueueConfig{
+ Name: "leaf-usage-equals-max",
+ Resources: leafRes,
+ }, parent, false, nil)
+ assert.NilError(t, err)
+ usageEqualsMaxQueue.allocatedResource =
resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 1000,
"cpu": 1000})
+
+ usageNotMatchingMaxQueue, err := NewConfiguredQueue(configs.QueueConfig{
+ Name: "leaf-usage-res-not-matching-max-res",
+ Resources: configs.Resources{
+ Max: map[string]string{"cpu": "1000"},
+ },
+ }, parent, false, nil)
+ assert.NilError(t, err)
+ usageNotMatchingMaxQueue.allocatedResource =
resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 1000})
+
+ testCases := []struct {
+ name string
+ queue *Queue
+ preconditionResult bool
+ }{
+ {"no usage or max change", leaf, false},
+ {"dynamic queue", dynamicLeaf, false},
+ {"preemption running", alreadyPreemptionRunning, false},
+ {"usage exceeded max, no start time", usageExceededMaxQueue,
false},
+ {"usage exceeded max, start time set", parent, true},
+ {"usage equals max resources", usageEqualsMaxQueue, false},
+ {"usage res not matching max", usageNotMatchingMaxQueue, false},
+ }
+ for _, tc := range testCases {
+ t.Run(tc.name, func(t *testing.T) {
+ assert.Equal(t, tc.queue.shouldTriggerPreemption(),
tc.preconditionResult)
})
}
+ // special case: reset time if usage below max
+ usageNotMatchingMaxQueue.quotaPreemptionStartTime =
time.Now().Add(time.Hour)
+ assert.Assert(t, !usageNotMatchingMaxQueue.shouldTriggerPreemption(),
"preemption should not be triggered")
+ assert.Assert(t,
usageNotMatchingMaxQueue.quotaPreemptionStartTime.IsZero(), "start time should
be reset")
}
func TestNewConfiguredQueue(t *testing.T) {
// check variable assignment
properties := getProperties()
resourceConf := getResourceConf()
- // turn resouce config into resource struct
+ // turn resource config into resource struct
resourceStruct, err := resources.NewResourceFromConf(resourceConf)
assert.NilError(t, err, "failed to create new resource from config:
%v", err)
@@ -2491,20 +2540,19 @@ func TestNewConfiguredQueue(t *testing.T) {
assert.DeepEqual(t, properties, parent.template.GetProperties())
assert.Assert(t, resources.Equals(resourceStruct,
parent.template.GetMaxResource()))
assert.Assert(t, resources.Equals(resourceStruct,
parent.template.GetGuaranteedResource()))
- assert.Equal(t, parent.quotaChangePreemptionDelay, uint64(0))
+ assert.Equal(t, parent.preemptionDelay, configs.DefaultPreemptionDelay)
+ assert.Equal(t, parent.quotaPreemptionDelay,
configs.DefaultQuotaPreemptionDelay)
// case 0: managed leaf queue can't use template
leafConfig := configs.QueueConfig{
- Name: "leaf_queue",
- Parent: false,
- Properties: getProperties(),
+ Name: "leaf_queue",
+ Parent: false,
+ Properties: map[string]string{configs.PreemptionDelay: "10s",
+ configs.QuotaPreemptionDelay: "1h"},
Resources: configs.Resources{
Max: getResourceConf(),
Guaranteed: getResourceConf(),
},
- Preemption: configs.Preemption{
- Delay: 500,
- },
}
childLeaf, err := NewConfiguredQueue(leafConfig, parent, false, nil)
assert.NilError(t, err, "failed to create queue: %v", err)
@@ -2517,15 +2565,13 @@ func TestNewConfiguredQueue(t *testing.T) {
childLeafGuaranteed, err :=
resources.NewResourceFromConf(leafConfig.Resources.Guaranteed)
assert.NilError(t, err, "Resource creation failed")
assert.Assert(t, resources.Equals(childLeaf.guaranteedResource,
childLeafGuaranteed))
- assert.Equal(t, childLeaf.quotaChangePreemptionDelay, uint64(500))
+ assert.Equal(t, childLeaf.preemptionDelay, 10*time.Second)
+ assert.Equal(t, childLeaf.quotaPreemptionDelay, 1*time.Hour)
- // case 1: non-leaf can't use template but it can inherit template from
parent
+ // case 1: non-leaf can't use template, but it can inherit template
from parent
NonLeafConfig := configs.QueueConfig{
Name: "nonleaf_queue",
Parent: true,
- Preemption: configs.Preemption{
- Delay: 500,
- },
}
childNonLeaf, err := NewConfiguredQueue(NonLeafConfig, parent, false,
nil)
assert.NilError(t, err, "failed to create queue: %v", err)
@@ -2534,7 +2580,6 @@ func TestNewConfiguredQueue(t *testing.T) {
assert.Equal(t, len(childNonLeaf.properties), 0)
assert.Assert(t, childNonLeaf.guaranteedResource == nil)
assert.Assert(t, childNonLeaf.maxResource == nil)
- assert.Equal(t, childNonLeaf.quotaChangePreemptionDelay, uint64(0))
// case 2: do not send queue event when silence flag is set to true
events.Init()
@@ -2542,17 +2587,12 @@ func TestNewConfiguredQueue(t *testing.T) {
eventSystem.StartServiceWithPublisher(false)
rootConfig := configs.QueueConfig{
Name: "root",
- Preemption: configs.Preemption{
- Delay: 500,
- },
}
-
- rootQ, err := NewConfiguredQueue(rootConfig, nil, true, nil)
+ _, err = NewConfiguredQueue(rootConfig, nil, true, nil)
assert.NilError(t, err, "failed to create queue: %v", err)
time.Sleep(time.Second)
noEvents := eventSystem.Store.CountStoredEvents()
assert.Equal(t, noEvents, uint64(0), "expected 0 event, got %d",
noEvents)
- assert.Equal(t, rootQ.quotaChangePreemptionDelay, uint64(0))
}
func TestResetRunningState(t *testing.T) {
@@ -2575,11 +2615,11 @@ func TestResetRunningState(t *testing.T) {
parent.MarkQueueForRemoval()
assert.Assert(t, parent.IsDraining(), "parent should be marked as
draining")
assert.Assert(t, leaf.IsDraining(), "leaf should be marked as draining")
- err = parent.applyConf(emptyConf, false)
+ _, err = parent.applyConf(emptyConf, false)
assert.NilError(t, err, "failed to update parent")
assert.Assert(t, parent.IsRunning(), "parent should be running again")
assert.Assert(t, leaf.IsDraining(), "leaf should still be marked as
draining")
- err = leaf.applyConf(emptyConf, false)
+ _, err = leaf.applyConf(emptyConf, false)
assert.NilError(t, err, "failed to update leaf")
assert.Assert(t, leaf.IsRunning(), "leaf should be running again")
}
@@ -2910,8 +2950,8 @@ func TestQueueEvents(t *testing.T) {
eventSystem := events.GetEventSystem().(*events.EventSystemImpl)
//nolint:errcheck
eventSystem.StartServiceWithPublisher(false)
queue, err := createRootQueue(nil)
- queue.Name = "testQueue"
assert.NilError(t, err)
+ queue.Name = "testQueue"
app := newApplication(appID0, "default", "root")
queue.AddApplication(app)
@@ -2947,8 +2987,10 @@ func TestQueueEvents(t *testing.T) {
},
},
}
- err = queue.ApplyConf(newConf)
+ var oldMax *resources.Resource
+ oldMax, err = queue.ApplyConf(newConf)
assert.NilError(t, err)
+ assert.Assert(t, oldMax.IsEmpty())
err = common.WaitForCondition(10*time.Millisecond, time.Second, func()
bool {
noEvents = eventSystem.Store.CountStoredEvents()
return noEvents == 3
@@ -3144,55 +3186,42 @@ func TestQueueBackoffProperties(t *testing.T) {
assert.Equal(t, 30*time.Second, leaf3.GetBackoffDelay())
}
-func TestQueue_IsQCPreemptionRunning(t *testing.T) {
- // create the root
- root, err := createManagedQueueMaxApps(nil, "root", true, nil, 1)
- assert.NilError(t, err, "queue create failed")
- parent, err := createManagedQueue(root, "parent", true, nil)
- assert.NilError(t, err, "failed to create parent queue")
-
- var leaf, leaf2, leaf11, leaf111 *Queue
- leaf, err = createManagedQueue(parent, "leaf", true, nil)
- assert.NilError(t, err, "failed to create leaf queue")
- leaf2, err = createManagedQueue(parent, "leaf2", false, nil)
- assert.NilError(t, err, "failed to create leaf2 queue")
-
- leaf11, err = createManagedQueue(leaf, "leaf11", true, nil)
- assert.NilError(t, err, "failed to create leaf11 queue")
-
- leaf111, err = createManagedQueue(leaf11, "leaf111", false, nil)
- assert.NilError(t, err, "failed to create leaf111 queue")
-
- // root.parent is running. any queue located in this hierarchy (both
upwards and downwards) should return true. All branches of parent should return
true.
- parent.isQuotaChangePreemptionRunning = true
- assert.Equal(t, parent.IsQCPreemptionRunning(), true)
- assert.Equal(t, root.IsQCPreemptionRunning(), true)
- assert.Equal(t, leaf111.IsQCPreemptionRunning(), true)
- assert.Equal(t, leaf11.IsQCPreemptionRunning(), true)
- assert.Equal(t, leaf.IsQCPreemptionRunning(), true)
- assert.Equal(t, leaf2.IsQCPreemptionRunning(), true)
-
- // reset
- parent.isQuotaChangePreemptionRunning = false
-
- // root.parent.leaf111 (leaf queue) is running. any queue located in
this hierarchy (upwards) should return true. Other branches of parent should
return false.
- leaf111.isQuotaChangePreemptionRunning = true
- assert.Equal(t, parent.IsQCPreemptionRunning(), true)
- assert.Equal(t, root.IsQCPreemptionRunning(), true)
- assert.Equal(t, leaf111.IsQCPreemptionRunning(), true)
- assert.Equal(t, leaf11.IsQCPreemptionRunning(), true)
- assert.Equal(t, leaf.IsQCPreemptionRunning(), true)
- assert.Equal(t, leaf2.IsQCPreemptionRunning(), false)
-
- // reset
- leaf111.isQuotaChangePreemptionRunning = false
-
- // root.parent.leaf2 (leaf queue) is running. any queue located in this
hierarchy (upwards) should return true. Other branches of parent should return
false.
- leaf2.isQuotaChangePreemptionRunning = true
- assert.Equal(t, parent.IsQCPreemptionRunning(), true)
- assert.Equal(t, root.IsQCPreemptionRunning(), true)
- assert.Equal(t, leaf111.IsQCPreemptionRunning(), false)
- assert.Equal(t, leaf11.IsQCPreemptionRunning(), false)
- assert.Equal(t, leaf.IsQCPreemptionRunning(), false)
- assert.Equal(t, leaf2.IsQCPreemptionRunning(), true)
+func TestQueue_setPreemptionTime(t *testing.T) {
+ root, e := createRootQueue(nil)
+ assert.NilError(t, e, "failed to create basic root queue")
+ tests := []struct {
+ name string
+ oldMaxResource *resources.Resource
+ maxRes map[string]string
+ oldDelay time.Duration
+ delay time.Duration
+ oldStart bool
+ timeChange bool
+ }{
+ {"empty", nil, map[string]string{}, 0, 0, false, false},
+ {"no delays", resources.Zero, map[string]string{"test": "100"},
0, 0, false, false},
+ {"max removed",
resources.NewResourceFromMap(map[string]resources.Quantity{"test": 100}), nil,
10, 10, false, false},
+ {"max removed update",
resources.NewResourceFromMap(map[string]resources.Quantity{"test": 100}), nil,
0, 10, true, true},
+ {"delay added",
resources.NewResourceFromMap(map[string]resources.Quantity{"test": 100}),
map[string]string{"test": "100"}, 0, 10, false, true},
+ {"delay change set start",
resources.NewResourceFromMap(map[string]resources.Quantity{"test": 100}),
map[string]string{"test": "100"}, 5, 10, true, true},
+ {"delay change no start",
resources.NewResourceFromMap(map[string]resources.Quantity{"test": 100}),
map[string]string{"test": "100"}, 5, 10, false, false},
+ {"max increase",
resources.NewResourceFromMap(map[string]resources.Quantity{"test": 10}),
map[string]string{"test": "100"}, 10, 10, false, false},
+ {"max lowered",
resources.NewResourceFromMap(map[string]resources.Quantity{"test": 100}),
map[string]string{"test": "10"}, 10, 10, false, true},
+ {"max lowered 2nd",
resources.NewResourceFromMap(map[string]resources.Quantity{"test": 100}),
map[string]string{"test": "10"}, 10, 10, true, false},
+ {"delay change max lowered 2nd",
resources.NewResourceFromMap(map[string]resources.Quantity{"test": 100}),
map[string]string{"test": "10"}, 5, 10, true, true},
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ queue, err := createManagedQueue(root, "test", false,
tt.maxRes)
+ assert.NilError(t, err, "queue creation failed
unexpectedly")
+ queue.quotaPreemptionDelay = tt.delay
+ if tt.oldStart {
+ queue.quotaPreemptionStartTime = time.Now()
+ }
+ before := queue.quotaPreemptionStartTime
+ queue.setPreemptionTime(tt.oldMaxResource, tt.oldDelay)
+ after := queue.quotaPreemptionStartTime
+ assert.Equal(t, tt.timeChange, before != after, "time
change not as expected")
+ })
+ }
}
diff --git a/pkg/scheduler/objects/quota_change_preemptor.go
b/pkg/scheduler/objects/quota_preemptor.go
similarity index 56%
rename from pkg/scheduler/objects/quota_change_preemptor.go
rename to pkg/scheduler/objects/quota_preemptor.go
index c302dfb3..17a751a7 100644
--- a/pkg/scheduler/objects/quota_change_preemptor.go
+++ b/pkg/scheduler/objects/quota_preemptor.go
@@ -28,21 +28,23 @@ import (
"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
)
-type QuotaChangePreemptionContext struct {
+type QuotaPreemptionContext struct {
queue *Queue
maxResource *resources.Resource
guaranteedResource *resources.Resource
allocatedResource *resources.Resource
+ preemptingResource *resources.Resource
preemptableResource *resources.Resource
allocations []*Allocation
}
-func NewQuotaChangePreemptor(queue *Queue) *QuotaChangePreemptionContext {
- preemptor := &QuotaChangePreemptionContext{
+func NewQuotaPreemptor(queue *Queue) *QuotaPreemptionContext {
+ preemptor := &QuotaPreemptionContext{
queue: queue,
- maxResource: queue.CloneMaxResource(),
- guaranteedResource: queue.GetGuaranteedResource().Clone(),
- allocatedResource: queue.GetAllocatedResource().Clone(),
+ maxResource: queue.cloneMaxResource(),
+ guaranteedResource: queue.GetGuaranteedResource(),
+ allocatedResource: queue.GetAllocatedResource(),
+ preemptingResource: queue.GetPreemptingResource(),
preemptableResource: nil,
allocations: make([]*Allocation, 0),
}
@@ -50,72 +52,57 @@ func NewQuotaChangePreemptor(queue *Queue)
*QuotaChangePreemptionContext {
return preemptor
}
-func (qcp *QuotaChangePreemptionContext) CheckPreconditions() bool {
- if !qcp.queue.IsManaged() || qcp.queue.IsQCPreemptionRunning() {
- return false
- }
- if
qcp.maxResource.StrictlyGreaterThanOrEqualsOnlyExisting(qcp.queue.GetAllocatedResource())
{
- return false
+func (qpc *QuotaPreemptionContext) tryPreemption() {
+ // Get Preemptable Resource
+ qpc.setPreemptableResources()
+
+ if qpc.queue.IsLeafQueue() {
+ qpc.tryPreemptionInternal()
+ return
}
- return true
-}
+ leafQueues := make(map[*Queue]*resources.Resource)
+ getChildQueuesPreemptableResource(qpc.queue, qpc.preemptableResource,
leafQueues)
-func (qcp *QuotaChangePreemptionContext) tryPreemption() {
- // Get Preemptable Resource
- preemptableResource := qcp.getPreemptableResources()
-
- if !qcp.queue.IsLeafQueue() {
- leafQueues := make(map[*Queue]*resources.Resource)
- getChildQueuesPreemptableResource(qcp.queue,
preemptableResource, leafQueues)
-
- log.Log(log.SchedQuotaChangePreemption).Info("Triggering quota
change preemption for parent queue",
- zap.String("parent queue", qcp.queue.GetQueuePath()),
- zap.String("preemptable resource",
preemptableResource.String()),
- zap.Any("no. of leaf queues with potential victims",
len(leafQueues)),
- )
-
- for leaf, leafPreemptableResource := range leafQueues {
- leafQueueQCPC := NewQuotaChangePreemptor(leaf)
-
log.Log(log.SchedQuotaChangePreemption).Info("Triggering quota change
preemption for leaf queue",
- zap.String("leaf queue", leaf.GetQueuePath()),
- zap.String("max resource",
leafQueueQCPC.maxResource.String()),
- zap.String("guaranteed resource",
leafQueueQCPC.guaranteedResource.String()),
- zap.String("actual allocated resource",
leafQueueQCPC.allocatedResource.String()),
- zap.String("preemptable resource distribution",
leafPreemptableResource.String()),
- )
-
leafQueueQCPC.tryPreemptionInternal(leafPreemptableResource)
- }
- } else {
- qcp.tryPreemptionInternal(preemptableResource)
+ log.Log(log.SchedQuotaChangePreemption).Info("Triggering quota change
preemption for parent queue",
+ zap.String("parent queue", qpc.queue.GetQueuePath()),
+ zap.Stringer("preemptable resource", qpc.preemptableResource),
+ zap.Int("no. of leaf queues with potential victims",
len(leafQueues)),
+ )
+
+ for leaf, leafPreemptableResource := range leafQueues {
+ leafQueueQCPC := NewQuotaPreemptor(leaf)
+ leafQueueQCPC.preemptableResource = leafPreemptableResource
+ leafQueueQCPC.tryPreemptionInternal()
}
}
-func (qcp *QuotaChangePreemptionContext)
tryPreemptionInternal(preemptableResource *resources.Resource) {
+func (qpc *QuotaPreemptionContext) tryPreemptionInternal() {
+ log.Log(log.SchedQuotaChangePreemption).Info("Triggering quota change
preemption for leaf queue",
+ zap.String("leaf queue", qpc.queue.GetQueuePath()),
+ zap.Stringer("max resource", qpc.maxResource),
+ zap.Stringer("guaranteed resource", qpc.guaranteedResource),
+ zap.Stringer("actual allocated resource",
qpc.allocatedResource),
+ zap.Stringer("preemptable resource distribution",
qpc.preemptableResource),
+ )
// quota change preemption has started, so mark the flag
- qcp.queue.MarkQuotaChangePreemptionRunning(true)
-
- qcp.preemptableResource = preemptableResource
+ qpc.queue.setQuotaPreemptionState(true)
// Filter the allocations
- qcp.allocations = qcp.filterAllocations()
+ qpc.filterAllocations()
// Sort the allocations
- qcp.sortAllocations()
+ qpc.sortAllocations()
// Preempt the victims
- qcp.preemptVictims()
+ qpc.preemptVictims()
// quota change preemption has ended, so mark the flag
- qcp.queue.MarkQuotaChangePreemptionRunning(false)
-
- // reset settings
- qcp.queue.resetPreemptionSettings()
+ qpc.queue.setQuotaPreemptionState(false)
}
// getChildQueuesPreemptableResource Compute leaf queue's preemptable resource
distribution from the parent's preemptable resource.
-// Start with immediate children of parent, compute each child distribution
from its parent preemptable resource and repeat the same
+// Starts with immediate children of parent, compute each child distribution
from its parent preemptable resource and repeat the same
// for all children at all levels until end leaf queues processed recursively.
-
// In order to achieve a fair distribution of parent's preemptable resource
among its children,
// Higher (relatively) the usage is, higher the preemptable resource would be
resulted in.
// Usage above guaranteed (if set) is only considered to derive the
preemptable resource.
@@ -135,15 +122,17 @@ func getChildQueuesPreemptableResource(queue *Queue,
parentPreemptableResource *
// In case guaranteed not set, entire used resources is treated as
preemptable resource.
// Total preemptable resource (sum of all children's preemptable
resources) would be calculated along the way.
for _, child := range children {
+ allocated := child.GetAllocatedResource()
+ guaranteed := child.GetGuaranteedResource()
// Skip child if there is no usage or usage below or equals
guaranteed
- if child.GetAllocatedResource().IsEmpty() ||
child.GetGuaranteedResource().StrictlyGreaterThanOrEqualsOnlyExisting(child.GetAllocatedResource())
{
+ if allocated.IsEmpty() ||
guaranteed.StrictlyGreaterThanOrEqualsOnlyExisting(allocated) {
continue
}
var usedResource *resources.Resource
- if !child.GetGuaranteedResource().IsEmpty() {
- usedResource =
resources.SubOnlyExisting(child.GetGuaranteedResource(),
child.GetAllocatedResource())
+ if !guaranteed.IsEmpty() {
+ usedResource = resources.SubOnlyExisting(guaranteed,
allocated)
} else {
- usedResource = child.GetAllocatedResource()
+ usedResource = allocated
}
preemptableResource := resources.NewResource()
for k, v := range usedResource.Resources {
@@ -177,16 +166,15 @@ func getChildQueuesPreemptableResource(queue *Queue,
parentPreemptableResource *
}
}
-// getPreemptableResources Get the preemptable resources for the queue
+// setPreemptableResources Get the preemptable resources for the queue
// Subtracting the usage from the max resource gives the preemptable resources.
// It could contain both positive and negative values. Only negative values
are preemptable.
-func (qcp *QuotaChangePreemptionContext) getPreemptableResources()
*resources.Resource {
- maxRes := qcp.queue.CloneMaxResource()
- used := resources.SubOnlyExisting(qcp.allocatedResource,
qcp.queue.GetPreemptingResource())
- if maxRes.IsEmpty() || used.IsEmpty() {
- return nil
+func (qpc *QuotaPreemptionContext) setPreemptableResources() {
+ used := resources.SubOnlyExisting(qpc.allocatedResource,
qpc.preemptingResource)
+ if qpc.maxResource.IsEmpty() || used.IsEmpty() {
+ return
}
- actual := resources.SubOnlyExisting(maxRes, used)
+ actual := resources.SubOnlyExisting(qpc.maxResource, used)
preemptableResource := resources.NewResource()
// Keep only the resource type which needs to be preempted
for k, v := range actual.Resources {
@@ -195,25 +183,25 @@ func (qcp *QuotaChangePreemptionContext)
getPreemptableResources() *resources.Re
}
}
if preemptableResource.IsEmpty() {
- return nil
+ return
}
- return preemptableResource
+ qpc.preemptableResource = preemptableResource
}
// filterAllocations Filter the allocations running in the queue suitable for
choosing as victims
-func (qcp *QuotaChangePreemptionContext) filterAllocations() []*Allocation {
- if resources.IsZero(qcp.preemptableResource) {
- return nil
+func (qpc *QuotaPreemptionContext) filterAllocations() {
+ if resources.IsZero(qpc.preemptableResource) {
+ return
}
- var allocations []*Allocation
- apps := qcp.queue.GetCopyOfApps()
+ qpc.allocations = make([]*Allocation, 0, 5)
+ apps := qpc.queue.GetCopyOfApps()
// Traverse allocations running in the queue
for _, app := range apps {
appAllocations := app.GetAllAllocations()
for _, alloc := range appAllocations {
// at least one of a preemptable resource type should
match with a potential victim
- if
!qcp.preemptableResource.MatchAny(alloc.GetAllocatedResource()) {
+ if
!qpc.preemptableResource.MatchAny(alloc.GetAllocatedResource()) {
continue
}
@@ -231,20 +219,19 @@ func (qcp *QuotaChangePreemptionContext)
filterAllocations() []*Allocation {
if alloc.IsPreempted() {
continue
}
- allocations = append(allocations, alloc)
+ qpc.allocations = append(qpc.allocations, alloc)
}
}
log.Log(log.SchedQuotaChangePreemption).Info("Filtering allocations",
- zap.String("queue", qcp.queue.GetQueuePath()),
- zap.Int("filtered allocations", len(allocations)),
+ zap.String("queue", qpc.queue.GetQueuePath()),
+ zap.Int("filtered allocations", len(qpc.allocations)),
)
- return allocations
}
// sortAllocations Sort the allocations running in the queue
-func (qcp *QuotaChangePreemptionContext) sortAllocations() {
- if len(qcp.allocations) > 0 {
- SortAllocations(qcp.allocations)
+func (qpc *QuotaPreemptionContext) sortAllocations() {
+ if len(qpc.allocations) > 0 {
+ SortAllocations(qpc.allocations)
}
}
@@ -252,51 +239,52 @@ func (qcp *QuotaChangePreemptionContext)
sortAllocations() {
// When both max and guaranteed resources are set and equal, to comply with
law of preemption "Ensure usage doesn't go below guaranteed resources",
// preempt victims on best effort basis. So, preempt victims as close as
possible to the required resource.
// Otherwise, exceeding above the required resources slightly is acceptable
for now.
-func (qcp *QuotaChangePreemptionContext) preemptVictims() {
- if len(qcp.allocations) == 0 {
+func (qpc *QuotaPreemptionContext) preemptVictims() {
+ if len(qpc.allocations) == 0 {
log.Log(log.SchedQuotaChangePreemption).Warn("BUG: No victims
to enforce quota change through preemption",
- zap.String("queue", qcp.queue.GetQueuePath()))
+ zap.String("queue", qpc.queue.GetQueuePath()))
return
}
apps := make(map[*Application][]*Allocation)
victimsTotalResource := resources.NewResource()
log.Log(log.SchedQuotaChangePreemption).Info("Found victims for quota
change preemption",
- zap.String("queue", qcp.queue.GetQueuePath()),
- zap.Int("total victims", len(qcp.allocations)),
- zap.String("max resources", qcp.maxResource.String()),
- zap.String("guaranteed resources",
qcp.guaranteedResource.String()),
- zap.String("allocated resources",
qcp.allocatedResource.String()),
- zap.String("preemptable resources",
qcp.preemptableResource.String()),
- zap.Bool("isGuaranteedSet", qcp.guaranteedResource.IsEmpty()),
+ zap.String("queue", qpc.queue.GetQueuePath()),
+ zap.Int("total victims", len(qpc.allocations)),
+ zap.Stringer("max resources", qpc.maxResource),
+ zap.Stringer("guaranteed resources", qpc.guaranteedResource),
+ zap.Stringer("allocated resources", qpc.allocatedResource),
+ zap.Stringer("preemptable resources", qpc.preemptableResource),
+ zap.Bool("isGuaranteedSet", qpc.guaranteedResource.IsEmpty()),
)
- for _, victim := range qcp.allocations {
- if
!qcp.preemptableResource.FitInMaxUndef(victim.GetAllocatedResource()) {
+ for _, victim := range qpc.allocations {
+ victimAlloc := victim.GetAllocatedResource()
+ if !qpc.preemptableResource.FitInMaxUndef(victimAlloc) {
continue
}
- application := qcp.queue.GetApplication(victim.applicationID)
+ application := qpc.queue.GetApplication(victim.applicationID)
if application == nil {
log.Log(log.SchedQuotaChangePreemption).Warn("BUG:
application not found in queue",
- zap.String("queue", qcp.queue.GetQueuePath()),
+ zap.String("queue", qpc.queue.GetQueuePath()),
zap.String("application", victim.applicationID))
continue
}
// Keep collecting the victims until preemptable resource
reaches and subtract the usage
- if
qcp.preemptableResource.StrictlyGreaterThanOnlyExisting(victimsTotalResource) {
+ if
qpc.preemptableResource.StrictlyGreaterThanOnlyExisting(victimsTotalResource) {
apps[application] = append(apps[application], victim)
-
qcp.allocatedResource.SubFrom(victim.GetAllocatedResource())
+ qpc.allocatedResource.SubFrom(victimAlloc)
}
// Has usage gone below the guaranteed resources?
// If yes, revert the recently added victim steps completely
and try next victim.
- if !qcp.guaranteedResource.IsEmpty() &&
qcp.guaranteedResource.StrictlyGreaterThanOnlyExisting(qcp.allocatedResource) {
+ if !qpc.guaranteedResource.IsEmpty() &&
qpc.guaranteedResource.StrictlyGreaterThanOnlyExisting(qpc.allocatedResource) {
victims := apps[application]
exceptRecentlyAddedVictims := victims[:len(victims)-1]
apps[application] = exceptRecentlyAddedVictims
-
qcp.allocatedResource.AddTo(victim.GetAllocatedResource())
-
victimsTotalResource.SubFrom(victim.GetAllocatedResource())
+ qpc.allocatedResource.AddTo(victimAlloc)
+ victimsTotalResource.SubFrom(victimAlloc)
} else {
-
victimsTotalResource.AddTo(victim.GetAllocatedResource())
+ victimsTotalResource.AddTo(victimAlloc)
}
}
@@ -305,28 +293,22 @@ func (qcp *QuotaChangePreemptionContext) preemptVictims()
{
for _, victim := range victims {
err := victim.MarkPreempted()
if err != nil {
-
log.Log(log.SchedRequiredNodePreemption).Warn("allocation is already released,
so not proceeding further on the daemon set preemption process",
- zap.String("applicationID",
victim.applicationID),
+
log.Log(log.SchedRequiredNodePreemption).Warn("allocation is already released,
ignoring in quota preemption process",
+ zap.String("applicationID",
victim.GetApplicationID()),
zap.String("allocationKey",
victim.GetAllocationKey()))
continue
}
-
log.Log(log.SchedQuotaChangePreemption).Info("Preempting victims for quota
change preemption",
- zap.String("queue",
qcp.queue.GetQueuePath()),
- zap.String("victim allocation key",
victim.allocationKey),
- zap.String("victim allocated
resources", victim.GetAllocatedResource().String()),
- zap.String("victim application",
victim.applicationID),
- zap.String("victim node",
victim.GetNodeID()),
- )
-
qcp.queue.IncPreemptingResource(victim.GetAllocatedResource())
-
victim.SendPreemptedByQuotaChangeEvent(qcp.queue.GetQueuePath())
+
log.Log(log.SchedQuotaChangePreemption).Info("Preempting victim for quota
change preemption",
+ zap.String("queue",
qpc.queue.GetQueuePath()),
+ zap.String("allocationKey",
victim.GetAllocationKey()),
+ zap.Stringer("allocatedResources",
victim.GetAllocatedResource()),
+ zap.String("applicationID",
victim.GetApplicationID()),
+ zap.String("nodeID",
victim.GetNodeID()))
+
qpc.queue.IncPreemptingResource(victim.GetAllocatedResource())
+
victim.SendPreemptedByQuotaChangeEvent(qpc.queue.GetQueuePath())
}
app.notifyRMAllocationReleased(victims,
si.TerminationType_PREEMPTED_BY_SCHEDULER,
- "preempting allocations to enforce new max
quota for queue : "+qcp.queue.GetQueuePath())
+ "preempting allocations to enforce new max
quota for queue : "+qpc.queue.GetQueuePath())
}
}
}
-
-// only for testing
-func (qcp *QuotaChangePreemptionContext) getVictims() []*Allocation {
- return qcp.allocations
-}
diff --git a/pkg/scheduler/objects/quota_change_preemptor_test.go
b/pkg/scheduler/objects/quota_preemptor_test.go
similarity index 89%
rename from pkg/scheduler/objects/quota_change_preemptor_test.go
rename to pkg/scheduler/objects/quota_preemptor_test.go
index d578122a..04b923ec 100644
--- a/pkg/scheduler/objects/quota_change_preemptor_test.go
+++ b/pkg/scheduler/objects/quota_preemptor_test.go
@@ -20,7 +20,6 @@ package objects
import (
"testing"
-
"time"
"gotest.tools/v3/assert"
@@ -30,88 +29,6 @@ import (
"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
)
-func TestQuotaChangeCheckPreconditions(t *testing.T) {
- parentConfig := configs.QueueConfig{
- Name: "parent",
- Parent: true,
- Resources: configs.Resources{
- Max: map[string]string{"memory": "1000"},
- },
- }
- parent, err := NewConfiguredQueue(parentConfig, nil, false, nil)
- assert.NilError(t, err)
- parent.allocatedResource =
resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 2000,
"cpu": 2000})
-
- leafRes := configs.Resources{
- Max: map[string]string{"memory": "1000"},
- }
- leaf, err := NewConfiguredQueue(configs.QueueConfig{
- Name: "leaf",
- Resources: leafRes,
- }, parent, false, nil)
- assert.NilError(t, err)
-
- dynamicLeaf, err := NewConfiguredQueue(configs.QueueConfig{
- Name: "dynamic-leaf",
- Resources: leafRes,
- }, parent, false, nil)
- assert.NilError(t, err)
- dynamicLeaf.isManaged = false
-
- alreadyPreemptionRunning, err := NewConfiguredQueue(configs.QueueConfig{
- Name: "leaf-already-preemption-running",
- Resources: leafRes,
- }, parent, false, nil)
- assert.NilError(t, err)
-
- usageExceededMaxQueue, err := NewConfiguredQueue(configs.QueueConfig{
- Name: "leaf-usage-exceeded-max",
- Resources: leafRes,
- }, parent, false, nil)
- assert.NilError(t, err)
- usageExceededMaxQueue.allocatedResource =
resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 2000,
"cpu": 2000})
-
- usageEqualsMaxQueue, err := NewConfiguredQueue(configs.QueueConfig{
- Name: "leaf-usage-equals-max",
- Resources: leafRes,
- }, parent, false, nil)
- assert.NilError(t, err)
- usageEqualsMaxQueue.allocatedResource =
resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 1000,
"cpu": 1000})
-
- usageNotMatchingMaxQueue, err := NewConfiguredQueue(configs.QueueConfig{
- Name: "leaf-usage-res-not-matching-max-res",
- Resources: configs.Resources{
- Max: map[string]string{"cpu": "1000"},
- },
- }, parent, false, nil)
- assert.NilError(t, err)
- usageNotMatchingMaxQueue.allocatedResource =
resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 1000})
-
- testCases := []struct {
- name string
- queue *Queue
- preemptionRunning bool
- preconditionResult bool
- }{
- {"parent queue", parent, false, true},
- {"leaf queue", leaf, false, false},
- {"dynamic leaf queue", dynamicLeaf, false, false},
- {"leaf queue, usage exceeded max resources",
usageExceededMaxQueue, false, true},
- {"leaf queue, usage equals max resources", usageEqualsMaxQueue,
false, false},
- {"leaf queue, already preemption process started or running",
alreadyPreemptionRunning, true, false},
- }
- for _, tc := range testCases {
- t.Run(tc.name, func(t *testing.T) {
-
tc.queue.MarkQuotaChangePreemptionRunning(tc.preemptionRunning)
- preemptor := NewQuotaChangePreemptor(tc.queue)
- assert.Equal(t, preemptor.CheckPreconditions(),
tc.preconditionResult)
- })
- }
- // Since parent's leaf queue "leaf-already-preemption-running" is
running, parent preconditions passed earlier should fail now
- preemptor := NewQuotaChangePreemptor(parent)
- assert.Equal(t, preemptor.CheckPreconditions(), false)
-}
-
func TestQuotaChangeGetPreemptableResource(t *testing.T) {
leaf, err := NewConfiguredQueue(configs.QueueConfig{
Name: "leaf",
@@ -137,8 +54,9 @@ func TestQuotaChangeGetPreemptableResource(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
tc.queue.maxResource = tc.maxResource
tc.queue.allocatedResource = tc.usedResource
- preemptor := NewQuotaChangePreemptor(tc.queue)
- assert.Equal(t,
resources.Equals(preemptor.getPreemptableResources(), tc.preemptable), true)
+ preemptor := NewQuotaPreemptor(tc.queue)
+ preemptor.setPreemptableResources()
+ assert.Equal(t,
resources.Equals(preemptor.preemptableResource, tc.preemptable), true)
})
}
}
@@ -191,10 +109,10 @@ func TestQuotaChangeFilterVictims(t *testing.T) {
err = asks[5].SetReleased(true)
assert.NilError(t, err)
}
- preemptor := NewQuotaChangePreemptor(tc.queue)
+ preemptor := NewQuotaPreemptor(tc.queue)
preemptor.preemptableResource = tc.preemptableResource
- allocations := preemptor.filterAllocations()
- assert.Equal(t, len(allocations),
tc.expectedAllocationsCount)
+ preemptor.filterAllocations()
+ assert.Equal(t, len(preemptor.allocations),
tc.expectedAllocationsCount)
removeAllocationAsks(node, asks)
resetQueue(leaf)
})
@@ -267,10 +185,10 @@ func TestQuotaChangeTryPreemption(t *testing.T) {
assignAllocationsToQueue(asks, leaf)
leaf.maxResource = tc.newMax
leaf.guaranteedResource = tc.guaranteed
- preemptor := NewQuotaChangePreemptor(tc.queue)
+ preemptor := NewQuotaPreemptor(tc.queue)
preemptor.allocations = asks
preemptor.tryPreemption()
- assert.Equal(t, len(preemptor.getVictims()),
tc.totalExpectedVictims)
+ assert.Equal(t, len(preemptor.allocations),
tc.totalExpectedVictims)
var victimsCount int
for _, a := range asks {
if a.IsPreempted() {
@@ -379,9 +297,9 @@ func TestQuotaChangeTryPreemptionWithDifferentResTypes(t
*testing.T) {
assignAllocationsToQueue(asks, leaf)
leaf.maxResource = tc.newMax
leaf.guaranteedResource = tc.guaranteed
- preemptor := NewQuotaChangePreemptor(tc.queue)
+ preemptor := NewQuotaPreemptor(tc.queue)
preemptor.tryPreemption()
- assert.Equal(t, len(preemptor.getVictims()),
v.totalExpectedVictims)
+ assert.Equal(t, len(preemptor.allocations),
v.totalExpectedVictims)
var victimsCount int
for _, a := range asks {
if a.IsPreempted() {
@@ -628,7 +546,7 @@ func TestQuotaChangeTryPreemptionForParentQueue(t
*testing.T) {
assignAllocationsToQueue(v, q)
}
tc.queue.maxResource = tc.newMax
- preemptor := NewQuotaChangePreemptor(tc.queue)
+ preemptor := NewQuotaPreemptor(tc.queue)
preemptor.tryPreemption()
for q, asks := range tc.victims {
var victimsCount int
diff --git a/pkg/scheduler/partition.go b/pkg/scheduler/partition.go
index 870f46cb..82c398e0 100644
--- a/pkg/scheduler/partition.go
+++ b/pkg/scheduler/partition.go
@@ -65,6 +65,7 @@ type PartitionContext struct {
reservations int // number of
reservations
placeholderAllocations int // number of
placeholder allocations
preemptionEnabled bool // whether
preemption is enabled or not
+ quotaPreemptionEnabled bool // whether quota
preemption is enabled or not
foreignAllocs map[string]*objects.Allocation // foreign
(non-Yunikorn) allocations
appQueueMapping *objects.AppQueueMapping // appID mapping
to queues
@@ -166,6 +167,7 @@ func (pc *PartitionContext) updateNodeSortingPolicy(conf
configs.PartitionConfig
// NOTE: this is a lock free call. It should only be called holding the
PartitionContext lock.
func (pc *PartitionContext) updatePreemption(conf configs.PartitionConfig) {
pc.preemptionEnabled = conf.Preemption.Enabled == nil ||
*conf.Preemption.Enabled
+ pc.quotaPreemptionEnabled = conf.Preemption.QuotaPreemptionEnabled !=
nil && *conf.Preemption.QuotaPreemptionEnabled
}
func (pc *PartitionContext) updatePartitionDetails(conf
configs.PartitionConfig) error {
@@ -189,12 +191,12 @@ func (pc *PartitionContext) updatePartitionDetails(conf
configs.PartitionConfig)
queueConf := conf.Queues[0]
root := pc.root
// update the root queue
- if err := root.ApplyConf(queueConf); err != nil {
+ if _, err = root.ApplyConf(queueConf); err != nil {
return err
}
- root.UpdateQueueProperties()
+ root.UpdateQueueProperties(nil)
// update the rest of the queues recursively
- if err := pc.updateQueues(queueConf.Queues, root); err != nil {
+ if err = pc.updateQueues(queueConf.Queues, root); err != nil {
return err
}
// update limit settings: start at the root
@@ -234,16 +236,17 @@ func (pc *PartitionContext) updateQueues(config
[]configs.QueueConfig, parent *o
pathName := parentPath + queueConfig.Name
queue := pc.getQueueInternal(pathName)
var err error
+ var oldMax *resources.Resource
if queue == nil {
queue, err = objects.NewConfiguredQueue(queueConfig,
parent, false, pc.appQueueMapping)
} else {
- err = queue.ApplyConf(queueConfig)
+ oldMax, err = queue.ApplyConf(queueConfig)
}
if err != nil {
return err
}
// special call to convert to a real policy from the property
- queue.UpdateQueueProperties()
+ queue.UpdateQueueProperties(oldMax)
if err = pc.updateQueues(queueConfig.Queues, queue); err != nil
{
return err
}
@@ -816,7 +819,7 @@ func (pc *PartitionContext) tryAllocate()
*objects.AllocationResult {
return nil
}
// try allocating from the root down
- result := pc.root.TryAllocate(pc.GetNodeIterator,
pc.GetFullNodeIterator, pc.GetNode, pc.IsPreemptionEnabled())
+ result := pc.root.TryAllocate(pc.GetNodeIterator,
pc.GetFullNodeIterator, pc.GetNode, pc.IsPreemptionEnabled(),
pc.IsQuotaPreemptionEnabled())
if result != nil {
return pc.allocate(result)
}
@@ -1645,6 +1648,12 @@ func (pc *PartitionContext) IsPreemptionEnabled() bool {
return pc.preemptionEnabled
}
+func (pc *PartitionContext) IsQuotaPreemptionEnabled() bool {
+ pc.RLock()
+ defer pc.RUnlock()
+ return pc.quotaPreemptionEnabled
+}
+
func (pc *PartitionContext) moveTerminatedApp(appID string) {
app := pc.getApplication(appID)
// nothing to do if the app is not found on the partition
diff --git a/pkg/scheduler/partition_test.go b/pkg/scheduler/partition_test.go
index ebca07f5..e62e501a 100644
--- a/pkg/scheduler/partition_test.go
+++ b/pkg/scheduler/partition_test.go
@@ -1050,14 +1050,14 @@ func TestAddAppTaskGroup(t *testing.T) {
// queue now has fair as sort policy app add should fail
queue := partition.GetQueue(defQueue)
- err = queue.ApplyConf(configs.QueueConfig{
+ _, err = queue.ApplyConf(configs.QueueConfig{
Name: "default",
Parent: false,
Queues: nil,
Properties: map[string]string{configs.ApplicationSortPolicy:
"fair"},
})
assert.NilError(t, err, "updating queue should not have failed")
- queue.UpdateQueueProperties()
+ queue.UpdateQueueProperties(nil)
err = partition.AddApplication(app)
if err == nil || partition.getApplication(appID2) != nil {
t.Errorf("add application should have failed due to queue sort
policy but did not")
@@ -3775,22 +3775,28 @@ func TestUpdatePreemption(t *testing.T) {
partition, err := newBasePartition()
assert.NilError(t, err, "Partition creation failed")
- assert.Assert(t, partition.IsPreemptionEnabled(), "preeemption should
be enabled by default")
+ assert.Assert(t, partition.IsPreemptionEnabled(), "preemption should be
enabled by default")
+ assert.Assert(t, !partition.IsQuotaPreemptionEnabled(), "quota
preemption should be disabled by default")
partition.updatePreemption(configs.PartitionConfig{})
- assert.Assert(t, partition.IsPreemptionEnabled(), "preeemption should
be enabled by empty config")
+ assert.Assert(t, partition.IsPreemptionEnabled(), "preemption should be
enabled by empty config")
+ assert.Assert(t, !partition.IsQuotaPreemptionEnabled(), "quota
preemption should be disabled by default")
partition.updatePreemption(configs.PartitionConfig{Preemption:
configs.PartitionPreemptionConfig{}})
- assert.Assert(t, partition.IsPreemptionEnabled(), "preeemption should
be enabled by empty preemption section")
+ assert.Assert(t, partition.IsPreemptionEnabled(), "preemption should be
enabled by empty preemption section")
+ assert.Assert(t, !partition.IsQuotaPreemptionEnabled(), "quota
preemption should be disabled by empty preemption section")
- partition.updatePreemption(configs.PartitionConfig{Preemption:
configs.PartitionPreemptionConfig{Enabled: nil}})
- assert.Assert(t, partition.IsPreemptionEnabled(), "preeemption should
be enabled by explicit nil")
+ partition.updatePreemption(configs.PartitionConfig{Preemption:
configs.PartitionPreemptionConfig{Enabled: nil, QuotaPreemptionEnabled: nil}})
+ assert.Assert(t, partition.IsPreemptionEnabled(), "preemption should be
enabled by explicit nil")
+ assert.Assert(t, !partition.IsQuotaPreemptionEnabled(), "quota
preemption should be disabled by explicit nil")
- partition.updatePreemption(configs.PartitionConfig{Preemption:
configs.PartitionPreemptionConfig{Enabled: &True}})
- assert.Assert(t, partition.IsPreemptionEnabled(), "preeemption should
be enabled by explicit true")
+ partition.updatePreemption(configs.PartitionConfig{Preemption:
configs.PartitionPreemptionConfig{Enabled: &True, QuotaPreemptionEnabled:
&True}})
+ assert.Assert(t, partition.IsPreemptionEnabled(), "preemption should be
enabled by explicit true")
+ assert.Assert(t, partition.IsQuotaPreemptionEnabled(), "quota
preemption should be enabled by explicit true")
- partition.updatePreemption(configs.PartitionConfig{Preemption:
configs.PartitionPreemptionConfig{Enabled: &False}})
- assert.Assert(t, !partition.IsPreemptionEnabled(), "preeemption should
be disabled by explicit false")
+ partition.updatePreemption(configs.PartitionConfig{Preemption:
configs.PartitionPreemptionConfig{Enabled: &False, QuotaPreemptionEnabled:
&False}})
+ assert.Assert(t, !partition.IsPreemptionEnabled(), "preemption should
be disabled by explicit false")
+ assert.Assert(t, !partition.IsQuotaPreemptionEnabled(), "quota
preemption should be disabled by explicit false")
}
func TestUpdateNodeSortingPolicy(t *testing.T) {
diff --git a/pkg/webservice/dao/partition_info.go
b/pkg/webservice/dao/partition_info.go
index ed4b4b10..07e27293 100644
--- a/pkg/webservice/dao/partition_info.go
+++ b/pkg/webservice/dao/partition_info.go
@@ -19,11 +19,12 @@
package dao
type PartitionInfo struct {
- ClusterID string `json:"clusterId"` //
no omitempty, cluster id should not be empty
- Name string `json:"name"` //
no omitempty, name should not be empty
- Capacity PartitionCapacity `json:"capacity"` //
no omitempty, omitempty doesn't work on a structure value
- NodeSortingPolicy NodeSortingPolicy `json:"nodeSortingPolicy"` //
no omitempty, omitempty doesn't work on a structure value
- PreemptionEnabled bool `json:"preemptionEnabled"` //
no omitempty, false shows preemption status better
+ ClusterID string `json:"clusterId"`
// no omitempty, cluster id should not be empty
+ Name string `json:"name"`
// no omitempty, name should not be empty
+ Capacity PartitionCapacity `json:"capacity"`
// no omitempty, omitempty doesn't work on a structure value
+ NodeSortingPolicy NodeSortingPolicy `json:"nodeSortingPolicy"`
// no omitempty, omitempty doesn't work on a structure value
+ PreemptionEnabled bool `json:"preemptionEnabled"`
// no omitempty, false shows preemption status better
+ QuotaPreemptionEnabled bool
`json:"quotaPreemptionEnabled"` // no omitempty, false shows quota preemption
status better
TotalNodes int `json:"totalNodes,omitempty"`
Applications map[string]int
`json:"applications,omitempty"`
TotalContainers int
`json:"totalContainers,omitempty"`
diff --git a/pkg/webservice/dao/queue_info.go b/pkg/webservice/dao/queue_info.go
index 7abe6f49..607d366a 100644
--- a/pkg/webservice/dao/queue_info.go
+++ b/pkg/webservice/dao/queue_info.go
@@ -52,6 +52,7 @@ type PartitionQueueDAOInfo struct {
PreemptionEnabled bool
`json:"preemptionEnabled"` // no omitempty, false shows preemption status better
IsPreemptionFence bool
`json:"isPreemptionFence"` // no omitempty, a false value gives a quick way to
understand whether it's fenced.
PreemptionDelay string
`json:"preemptionDelay,omitempty"`
+ QuotaPreemptionDelay string
`json:"quotaPreemptionDelay,omitempty"`
IsPriorityFence bool `json:"isPriorityFence"`
// no omitempty, a false value gives a quick way to understand whether it's
fenced.
PriorityOffset int32
`json:"priorityOffset,omitempty"`
}
diff --git a/pkg/webservice/handlers.go b/pkg/webservice/handlers.go
index a76f76c5..02532862 100644
--- a/pkg/webservice/handlers.go
+++ b/pkg/webservice/handlers.go
@@ -1018,6 +1018,7 @@ func getPartitionInfoDAO(lists
map[string]*scheduler.PartitionContext) []*dao.Pa
partitionInfo.State = partitionContext.GetCurrentState()
partitionInfo.LastStateTransitionTime =
partitionContext.GetStateTime().UnixNano()
partitionInfo.PreemptionEnabled =
partitionContext.IsPreemptionEnabled()
+ partitionInfo.QuotaPreemptionEnabled =
partitionContext.IsQuotaPreemptionEnabled()
capacityInfo := dao.PartitionCapacity{}
capacity := partitionContext.GetTotalPartitionResource()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]