This is an automated email from the ASF dual-hosted git repository.

wilfreds pushed a commit to branch branch-1.8
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git


The following commit(s) were added to refs/heads/branch-1.8 by this push:
     new 656455ab [YUNIKORN-3193] Quota preemption to be turned off by default 
(#1061)
656455ab is described below

commit 656455abb3523ba3802d1e79513b9f34a2cc69da
Author: Wilfred Spiegelenburg <[email protected]>
AuthorDate: Mon Jan 19 11:21:01 2026 +1100

    [YUNIKORN-3193] Quota preemption to be turned off by default (#1061)
    
    Quota preemption is a new functionality. Quota preemption is turned off
    by default at the partition level via a config. To use quota preemption
    the 'QuotaPreemptionEnabled' flag in the partition preemption config
    must be set top 'true'.
    ----
    partitions:
      - name: default
        preemption:
          quotapreemptionenabled: true
    ----
    When turned on the delay for quota preemption is set via the queue
    property: `preemption.quota.delay`, The property uses the same syntax as
    the preemption delay: a Golang duration in string form.
    The default delay is set to '0s' and no preemption will be triggered.
    
    Quota preemption is only supported on managed queues not on dynamic
    queues.
    
    Closes: #1061
    
    Signed-off-by: Wilfred Spiegelenburg <[email protected]>
    (cherry picked from commit 98832728ec40ddd105a48037b06f0399f34ca463)
---
 pkg/common/configs/config.go                       |  37 +-
 pkg/common/configs/config_test.go                  |   7 +-
 pkg/common/configs/configvalidator.go              |  33 +-
 pkg/common/configs/configvalidator_test.go         |  28 --
 pkg/scheduler/objects/preemption_utilities_test.go |   2 +-
 pkg/scheduler/objects/queue.go                     | 396 +++++++++----------
 pkg/scheduler/objects/queue_test.go                | 417 +++++++++++----------
 ...uota_change_preemptor.go => quota_preemptor.go} | 216 +++++------
 ...e_preemptor_test.go => quota_preemptor_test.go} | 104 +----
 pkg/scheduler/partition.go                         |  21 +-
 pkg/scheduler/partition_test.go                    |  28 +-
 pkg/webservice/dao/partition_info.go               |  11 +-
 pkg/webservice/dao/queue_info.go                   |   1 +
 pkg/webservice/handlers.go                         |   1 +
 14 files changed, 615 insertions(+), 687 deletions(-)

diff --git a/pkg/common/configs/config.go b/pkg/common/configs/config.go
index 792f833b..50a35fe6 100644
--- a/pkg/common/configs/config.go
+++ b/pkg/common/configs/config.go
@@ -32,14 +32,14 @@ import (
        "github.com/apache/yunikorn-core/pkg/log"
 )
 
-// The configuration can contain multiple partitions. Each partition contains 
the queue definition for a logical
+// SchedulerConfig can contain multiple partitions. Each partition contains 
the queue definition for a logical
 // set of scheduler resources.
 type SchedulerConfig struct {
        Partitions []PartitionConfig
        Checksum   string `yaml:",omitempty" json:",omitempty"`
 }
 
-// The partition object for each partition:
+// PartitionConfig for each partition:
 // - the name of the partition
 // - a list of sub or child queues
 // - a list of placement rule definition objects
@@ -60,12 +60,13 @@ type UserGroupResolver struct {
        Type string `yaml:"type,omitempty" json:"type,omitempty"`
 }
 
-// The partition preemption configuration
+// PartitionPreemptionConfig defines global flags for both preemption types
 type PartitionPreemptionConfig struct {
-       Enabled *bool `yaml:",omitempty" json:",omitempty"`
+       Enabled                *bool `yaml:",omitempty" json:",omitempty"`
+       QuotaPreemptionEnabled *bool `yaml:",omitempty" json:",omitempty"`
 }
 
-// The queue object for each queue:
+// QueueConfig object for each queue:
 // - the name of the queue
 // - a resources object to specify resource limits on the queue
 // - the maximum number of applications that can run in the queue
@@ -84,20 +85,16 @@ type QueueConfig struct {
        ChildTemplate   ChildTemplate     `yaml:",omitempty" json:",omitempty"`
        Queues          []QueueConfig     `yaml:",omitempty" json:",omitempty"`
        Limits          []Limit           `yaml:",omitempty" json:",omitempty"`
-       Preemption      Preemption        `yaml:",omitempty" json:",omitempty"`
-}
-
-type Preemption struct {
-       Delay uint64 `yaml:",omitempty" json:",omitempty"`
 }
 
+// ChildTemplate set on a parent queue with settings to be applied to the 
child created via a placement rule.
 type ChildTemplate struct {
        MaxApplications uint64            `yaml:",omitempty" json:",omitempty"`
        Properties      map[string]string `yaml:",omitempty" json:",omitempty"`
        Resources       Resources         `yaml:",omitempty" json:",omitempty"`
 }
 
-// The resource limits to set on the queue. The definition allows for an 
unlimited number of types to be used.
+// The Resources limit to apply on the queue. The definition allows for an 
unlimited number of types to be used.
 // The mapping to "known" resources is not handled here.
 // - guaranteed resources
 // - max resources
@@ -106,12 +103,12 @@ type Resources struct {
        Max        map[string]string `yaml:",omitempty" json:",omitempty"`
 }
 
-// The queue placement rule definition
+// The PlacementRule definition:
 // - the name of the rule
 // - create flag: can the rule create a queue
 // - user and group filter to be applied on the callers
 // - rule link to allow setting a rule to generate the parent
-// - value a generic value interpreted depending on the rule type (i.e queue 
name for the "fixed" rule
+// - value a generic value interpreted depending on the rule type (i.e. queue 
name for the "fixed" rule
 // or the application label name for the "tag" rule)
 type PlacementRule struct {
        Name   string
@@ -121,7 +118,7 @@ type PlacementRule struct {
        Value  string         `yaml:",omitempty" json:",omitempty"`
 }
 
-// The user and group filter for a rule.
+// Filter for users and groups for a PlacementRule.
 // - type of filter (allow or deny filter, empty means allow)
 // - list of users to filter (maybe empty)
 // - list of groups to filter (maybe empty)
@@ -132,13 +129,13 @@ type Filter struct {
        Groups []string `yaml:",omitempty" json:",omitempty"`
 }
 
-// A list of limit objects to define limits for a partition or queue
+// Limits is a list of Limit objects to define user and group limits for a 
partition or queue.
 type Limits struct {
        Limit []Limit
 }
 
-// The limit object to specify user and or group limits at different levels in 
the partition or queues
-// Different limits for the same user or group may be defined at different 
levels in the hierarchy
+// A Limit object to specify user and or group limits at different levels in 
the partition or queues.
+// Different limits for the same user or group may be defined at different 
levels in the hierarchy:
 // - limit description (optional)
 // - list of users (maybe empty)
 // - list of groups (maybe empty)
@@ -152,8 +149,10 @@ type Limit struct {
        MaxApplications uint64            `yaml:",omitempty" json:",omitempty"`
 }
 
-// Global Node Sorting Policy section
-// - type: different type of policies supported (binpacking, fair etc)
+// NodeSortingPolicy to be applied globally.
+// - type: different type of policies supported (binpacking, fair etc.)
+// - resource weight: factor to be applied to comparisons of different 
resource types when sorting nodes. Types not
+// mentioned have a weight of 1.0.
 type NodeSortingPolicy struct {
        Type            string
        ResourceWeights map[string]float64 `yaml:",omitempty" json:",omitempty"`
diff --git a/pkg/common/configs/config_test.go 
b/pkg/common/configs/config_test.go
index 53f737c0..72eaf31e 100644
--- a/pkg/common/configs/config_test.go
+++ b/pkg/common/configs/config_test.go
@@ -626,6 +626,7 @@ partitions:
       - name: root
     preemption:
       enabled: true
+      quotapreemptionenabled: true
   - name: "partition-0"
     queues:
       - name: root
@@ -637,6 +638,7 @@ partitions:
       - name: root
     preemption:
       enabled: false
+      quotapreemptionenabled: false
   - name: "partition-0"
     queues:
       - name: root
@@ -644,15 +646,18 @@ partitions:
        // validate the config and check after the update
        conf, err := CreateConfig(data)
        assert.NilError(t, err, "should expect no error")
-       assert.Assert(t, conf.Partitions[0].Preemption.Enabled == nil, "default 
should be nil")
+       assert.Assert(t, conf.Partitions[0].Preemption.Enabled == nil, "default 
preemption should be nil")
+       assert.Assert(t, conf.Partitions[0].Preemption.QuotaPreemptionEnabled 
== nil, "default quota preemption should be nil")
 
        conf, err = CreateConfig(dataEnabled)
        assert.NilError(t, err, "should expect no error")
        assert.Assert(t, *conf.Partitions[0].Preemption.Enabled, "preemption 
should be enabled")
+       assert.Assert(t, *conf.Partitions[0].Preemption.QuotaPreemptionEnabled, 
"quota preemption should be enabled")
 
        conf, err = CreateConfig(dataDisabled)
        assert.NilError(t, err, "should expect no error")
        assert.Assert(t, !*conf.Partitions[0].Preemption.Enabled, "preemption 
should be disabled")
+       assert.Assert(t, 
!*conf.Partitions[0].Preemption.QuotaPreemptionEnabled, "quota preemption 
should be disabled")
 }
 
 func TestParseRule(t *testing.T) {
diff --git a/pkg/common/configs/configvalidator.go 
b/pkg/common/configs/configvalidator.go
index 109d295f..23e90577 100644
--- a/pkg/common/configs/configvalidator.go
+++ b/pkg/common/configs/configvalidator.go
@@ -41,6 +41,7 @@ const (
        DotReplace       = "_dot_"
        DefaultPartition = "default"
 
+       // constants defining the names for properties
        ApplicationSortPolicy                    = "application.sort.policy"
        ApplicationSortPriority                  = "application.sort.priority"
        ApplicationUnschedulableAsksBackoff      = 
"application.unschedasks.backoff"
@@ -49,6 +50,7 @@ const (
        PriorityOffset                           = "priority.offset"
        PreemptionPolicy                         = "preemption.policy"
        PreemptionDelay                          = "preemption.delay"
+       QuotaPreemptionDelay                     = "preemption.quota.delay"
 
        // app sort priority values
        ApplicationSortPriorityEnabled  = "enabled"
@@ -61,35 +63,39 @@ const (
        errLastQueueLeaf
 )
 
-type placementPathCheckResult int
-
-// Priority
+// MinPriority and MaxPriority used in offset calculations. K8s uses an int32 
value for priorities, internal calculations
+// use int64.
 var MinPriority int32 = math.MinInt32
 var MaxPriority int32 = math.MaxInt32
 
+// DefaultQuotaPreemptionDelay is 0, no quota preemption by default
+var DefaultQuotaPreemptionDelay time.Duration = 0
+
+// DefaultPreemptionDelay is 30 seconds, guaranteed resources must be set to 
trigger preemption
 var DefaultPreemptionDelay = 30 * time.Second
 var DefaultAskBackOffDelay = 30 * time.Second
 
-// A queue can be a username with the dot replaced. Most systems allow a 32 
character user name.
+// QueueNameRegExp to validate the name of a queue.
+// A queue can be a username with the dot replaced. Most systems allow a 32 
character username.
 // The queue name must thus allow for at least that length with the 
replacement of dots.
 var QueueNameRegExp = regexp.MustCompile(`^[a-zA-Z0-9_:#/@-]{1,64}$`)
 
-// User and group name check: systems allow different things POSIX is the base 
but we need to be lenient and allow more.
-// A username must start with a letter(uppercase or lowercase),
-// followed by any number of letter(uppercase or lowercase), digits, ':', '#', 
'/', '_', '.', '@', '-', and may end with '$'.
+// UserRegExp to validate a username. Systems allow different things POSIX is 
the base, but we need to be lenient and allow more.
+// A username must start with a letter(uppercase or lowercase), followed by 
any number of letter(uppercase or lowercase),
+// digits, ':', '#', '/', '_', '.', '@', '-', and may end with '$'.
+// This supports OIDC compliant names.
 var UserRegExp = regexp.MustCompile(`^[_a-zA-Z][a-zA-Z0-9:#/_.@-]*[$]?$`)
 
-// Groups should have a slightly more restrictive regexp (no # / @ or $ at the 
end)
+// GroupRegExp is similar to UserRegExp, groups have a slightly more 
restrictive regexp (no # / @ or $ at the end)
 var GroupRegExp = regexp.MustCompile(`^[_a-zA-Z][a-zA-Z0-9:_.-]*$`)
 
-// all characters that make a name different from a regexp
+// SpecialRegExp matches all characters that make a name different from a 
regexp
 var SpecialRegExp = regexp.MustCompile(`[\^$*+?()\[{}|]`)
 
-// The rule maps to a go identifier check that regexp only
+// RuleNameRegExp as rule names must map to a go identifier check that regexp 
only
 var RuleNameRegExp = regexp.MustCompile(`^[_a-zA-Z][a-zA-Z0-9_]*$`)
 
-// Minimum Quota change preemption delay
-var minQuotaChangePreemptionDelay uint64 = 60
+type placementPathCheckResult int
 
 type placementStaticPath struct {
        path           string
@@ -668,9 +674,6 @@ func checkQueues(queue *QueueConfig, level int) error {
                        return fmt.Errorf("duplicate child name found with name 
'%s', level %d", child.Name, level)
                }
                queueMap[strings.ToLower(child.Name)] = true
-               if queue.Preemption.Delay != 0 && queue.Preemption.Delay <= 
minQuotaChangePreemptionDelay {
-                       return fmt.Errorf("invalid preemption delay %d, must be 
between %d and %d", queue.Preemption.Delay, minQuotaChangePreemptionDelay, 
uint64(math.MaxUint64))
-               }
        }
 
        // recurse into the depth if this level passed
diff --git a/pkg/common/configs/configvalidator_test.go 
b/pkg/common/configs/configvalidator_test.go
index 9b014c22..4d0c6495 100644
--- a/pkg/common/configs/configvalidator_test.go
+++ b/pkg/common/configs/configvalidator_test.go
@@ -2327,34 +2327,6 @@ func TestCheckQueues(t *testing.T) { //nolint:funlen
                                assert.Equal(t, 2, len(q.Queues), "Expected two 
queues")
                        },
                },
-               {
-                       name: "Invalid Preemption delay for leaf queue",
-                       queue: &QueueConfig{
-                               Name: "root",
-                               Queues: []QueueConfig{
-                                       {Name: "leaf",
-                                               Preemption: Preemption{
-                                                       Delay: 10,
-                                               },
-                                       },
-                               },
-                               Preemption: Preemption{
-                                       Delay: 10,
-                               },
-                       },
-                       level:            1,
-                       expectedErrorMsg: "invalid preemption delay 10, must be 
between 60 and 18446744073709551615",
-               },
-               {
-                       name: "Setting Preemption delay on root queue would be 
ignored",
-                       queue: &QueueConfig{
-                               Name: "root",
-                               Preemption: Preemption{
-                                       Delay: 10,
-                               },
-                       },
-                       level: 0,
-               },
        }
 
        for _, tc := range testCases {
diff --git a/pkg/scheduler/objects/preemption_utilities_test.go 
b/pkg/scheduler/objects/preemption_utilities_test.go
index 0d30e79c..b63c85b3 100644
--- a/pkg/scheduler/objects/preemption_utilities_test.go
+++ b/pkg/scheduler/objects/preemption_utilities_test.go
@@ -158,7 +158,7 @@ func resetQueue(queue *Queue) {
        queue.maxResource = nil
        queue.allocatedResource = nil
        queue.guaranteedResource = nil
-       queue.isQuotaChangePreemptionRunning = false
+       queue.isQuotaPreemptionRunning = false
        queue.preemptingResource = nil
 }
 
diff --git a/pkg/scheduler/objects/queue.go b/pkg/scheduler/objects/queue.go
index f8044c24..18342434 100644
--- a/pkg/scheduler/objects/queue.go
+++ b/pkg/scheduler/objects/queue.go
@@ -54,46 +54,46 @@ type Queue struct {
        Name      string // Queue name as in the config etc.
 
        // Private fields need protection
-       sortType            policies.SortPolicy       // How applications 
(leaf) or queues (parents) are sorted
-       children            map[string]*Queue         // Only for direct 
children, parent queue only
-       childPriorities     map[string]int32          // cached priorities for 
child queues
-       applications        map[string]*Application   // only for leaf queue
-       appPriorities       map[string]int32          // cached priorities for 
application
-       reservedApps        map[string]int            // applications reserved 
within this queue, with reservation count
-       parent              *Queue                    // link back to the 
parent in the scheduler
-       pending             *resources.Resource       // pending resource for 
the apps in the queue
-       allocatedResource   *resources.Resource       // allocated resource for 
the apps in the queue
-       preemptingResource  *resources.Resource       // preempting resource 
for the apps in the queue
-       prioritySortEnabled bool                      // whether priority is 
used for request sorting
-       priorityPolicy      policies.PriorityPolicy   // priority policy
-       priorityOffset      int32                     // priority offset for 
this queue relative to others
-       preemptionPolicy    policies.PreemptionPolicy // preemption policy
-       preemptionDelay     time.Duration             // time before preemption 
is considered
-       currentPriority     int32                     // the current scheduling 
priority of this queue
+       sortType             policies.SortPolicy       // How applications 
(leaf) or queues (parents) are sorted
+       children             map[string]*Queue         // Only for direct 
children, parent queue only
+       childPriorities      map[string]int32          // cached priorities for 
child queues
+       applications         map[string]*Application   // only for leaf queue
+       appPriorities        map[string]int32          // cached priorities for 
application
+       reservedApps         map[string]int            // applications reserved 
within this queue, with reservation count
+       parent               *Queue                    // link back to the 
parent in the scheduler
+       pending              *resources.Resource       // pending resource for 
the apps in the queue
+       allocatedResource    *resources.Resource       // allocated resource 
for the apps in the queue
+       preemptingResource   *resources.Resource       // preempting resource 
for the apps in the queue
+       prioritySortEnabled  bool                      // whether priority is 
used for request sorting
+       priorityPolicy       policies.PriorityPolicy   // priority policy
+       priorityOffset       int32                     // priority offset for 
this queue relative to others
+       preemptionPolicy     policies.PreemptionPolicy // preemption policy
+       preemptionDelay      time.Duration             // time delay before 
preemption is considered
+       quotaPreemptionDelay time.Duration             // time delay before 
quota preemption is considered
+       currentPriority      int32                     // the current 
scheduling priority of this queue
 
        // The queue properties should be treated as immutable the value is a 
merge of the
        // parent properties with the config for this queue only manipulated 
during creation
        // of the queue or via a queue configuration update.
-       properties                     map[string]string
-       adminACL                       security.ACL        // admin ACL
-       submitACL                      security.ACL        // submit ACL
-       maxResource                    *resources.Resource // When not set, max 
= nil
-       guaranteedResource             *resources.Resource // When not set, 
Guaranteed == 0
-       isLeaf                         bool                // this is a leaf 
queue or not (i.e. parent)
-       isManaged                      bool                // queue is part of 
the config, not auto created
-       stateMachine                   *fsm.FSM            // the state of the 
queue for scheduling
-       stateTime                      time.Time           // last time the 
state was updated (needed for cleanup)
-       maxRunningApps                 uint64
-       runningApps                    uint64
-       allocatingAcceptedApps         map[string]bool
-       template                       *template.Template
-       queueEvents                    *schedEvt.QueueEvents
-       appQueueMapping                *AppQueueMapping // appID mapping to 
queues
-       quotaChangePreemptionDelay     uint64
-       quotaChangePreemptionStartTime time.Time
-       isQuotaChangePreemptionRunning bool
-       unschedAskBackoff              uint64
-       askBackoffDelay                time.Duration
+       properties               map[string]string
+       adminACL                 security.ACL        // admin ACL
+       submitACL                security.ACL        // submit ACL
+       maxResource              *resources.Resource // When not set, max = nil
+       guaranteedResource       *resources.Resource // When not set, 
Guaranteed == 0
+       isLeaf                   bool                // this is a leaf queue or 
not (i.e. parent)
+       isManaged                bool                // queue is part of the 
config, not auto created
+       stateMachine             *fsm.FSM            // the state of the queue 
for scheduling
+       stateTime                time.Time           // last time the state was 
updated (needed for cleanup)
+       maxRunningApps           uint64
+       runningApps              uint64
+       allocatingAcceptedApps   map[string]bool
+       template                 *template.Template
+       queueEvents              *schedEvt.QueueEvents
+       appQueueMapping          *AppQueueMapping // appID mapping to queues
+       quotaPreemptionStartTime time.Time
+       isQuotaPreemptionRunning bool
+       unschedAskBackoff        uint64
+       askBackoffDelay          time.Duration
 
        locking.RWMutex
 }
@@ -101,24 +101,24 @@ type Queue struct {
 // newBlankQueue creates a new empty queue objects with all values initialised.
 func newBlankQueue() *Queue {
        return &Queue{
-               children:                       make(map[string]*Queue),
-               childPriorities:                make(map[string]int32),
-               applications:                   make(map[string]*Application),
-               appPriorities:                  make(map[string]int32),
-               reservedApps:                   make(map[string]int),
-               allocatingAcceptedApps:         make(map[string]bool),
-               properties:                     make(map[string]string),
-               stateMachine:                   NewObjectState(),
-               allocatedResource:              resources.NewResource(),
-               preemptingResource:             resources.NewResource(),
-               pending:                        resources.NewResource(),
-               currentPriority:                configs.MinPriority,
-               prioritySortEnabled:            true,
-               preemptionDelay:                configs.DefaultPreemptionDelay,
-               preemptionPolicy:               
policies.DefaultPreemptionPolicy,
-               quotaChangePreemptionDelay:     0,
-               quotaChangePreemptionStartTime: time.Time{},
-               askBackoffDelay:                configs.DefaultAskBackOffDelay,
+               children:                 make(map[string]*Queue),
+               childPriorities:          make(map[string]int32),
+               applications:             make(map[string]*Application),
+               appPriorities:            make(map[string]int32),
+               reservedApps:             make(map[string]int),
+               allocatingAcceptedApps:   make(map[string]bool),
+               properties:               make(map[string]string),
+               stateMachine:             NewObjectState(),
+               allocatedResource:        resources.NewResource(),
+               preemptingResource:       resources.NewResource(),
+               pending:                  resources.NewResource(),
+               currentPriority:          configs.MinPriority,
+               prioritySortEnabled:      true,
+               preemptionDelay:          configs.DefaultPreemptionDelay,
+               preemptionPolicy:         policies.DefaultPreemptionPolicy,
+               quotaPreemptionDelay:     0,
+               quotaPreemptionStartTime: time.Time{},
+               askBackoffDelay:          configs.DefaultAskBackOffDelay,
        }
 }
 
@@ -139,7 +139,7 @@ func NewConfiguredQueue(conf configs.QueueConfig, parent 
*Queue, silence bool, a
        sq.updateMaxRunningAppsMetrics()
 
        // update the properties
-       if err := sq.applyConf(conf, silence); err != nil {
+       if _, err := sq.applyConf(conf, silence); err != nil {
                return nil, errors.Join(errors.New("configured queue creation 
failed: "), err)
        }
 
@@ -148,13 +148,13 @@ func NewConfiguredQueue(conf configs.QueueConfig, parent 
*Queue, silence bool, a
        if parent != nil {
                // pull the properties from the parent that should be set on 
the child
                sq.mergeProperties(parent.getProperties(), conf.Properties)
-               sq.UpdateQueueProperties()
+               sq.UpdateQueueProperties(nil)
                err := parent.addChildQueue(sq)
                if err != nil {
                        return nil, errors.Join(errors.New("configured queue 
creation failed: "), err)
                }
        } else {
-               sq.UpdateQueueProperties()
+               sq.UpdateQueueProperties(nil)
        }
 
        if !silence {
@@ -219,7 +219,7 @@ func newDynamicQueueInternal(name string, leaf bool, parent 
*Queue, appQueueMapp
                return nil, errors.Join(errors.New("dynamic queue creation 
failed: "), err)
        }
 
-       sq.UpdateQueueProperties()
+       sq.UpdateQueueProperties(nil)
        sq.queueEvents = schedEvt.NewQueueEvents(events.GetEventSystem())
        log.Log(log.SchedQueue).Info("dynamic queue added to scheduler",
                zap.String("queueName", sq.QueuePath))
@@ -272,13 +272,13 @@ func (sq *Queue) mergeProperties(parent, config 
map[string]string) {
        }
 }
 
-func preemptionDelay(value string) (time.Duration, error) {
+func convertDelay(value string, def time.Duration) (time.Duration, error) {
        result, err := time.ParseDuration(value)
        if err != nil {
-               return configs.DefaultPreemptionDelay, err
+               return def, err
        }
        if int64(result) <= int64(0) {
-               return configs.DefaultPreemptionDelay, fmt.Errorf("%s must be 
positive: %s", configs.PreemptionDelay, value)
+               return def, fmt.Errorf("delay must be positive value: %s", 
value)
        }
        return result, nil
 }
@@ -328,19 +328,8 @@ func unschedulableAskBackoff(value string) (uint64, error) 
{
        return intValue, nil
 }
 
-func backoffDelay(value string) (time.Duration, error) {
-       result, err := time.ParseDuration(value)
-       if err != nil {
-               return configs.DefaultAskBackOffDelay, err
-       }
-       if int64(result) <= int64(0) {
-               return configs.DefaultAskBackOffDelay, fmt.Errorf("%s must be 
positive: %s", configs.ApplicationUnschedulableAsksBackoffDelay, value)
-       }
-       return result, nil
-}
-
 // ApplyConf is the locked version of applyConf
-func (sq *Queue) ApplyConf(conf configs.QueueConfig) error {
+func (sq *Queue) ApplyConf(conf configs.QueueConfig) (*resources.Resource, 
error) {
        sq.Lock()
        defer sq.Unlock()
        return sq.applyConf(conf, false)
@@ -349,20 +338,21 @@ func (sq *Queue) ApplyConf(conf configs.QueueConfig) 
error {
 // applyConf applies all the properties to the queue from the config.
 // lock free call, must be called holding the queue lock or during create only.
 // If the silence flag is set to true, the function will not log when setting 
users and groups.
-func (sq *Queue) applyConf(conf configs.QueueConfig, silence bool) error {
+// This function MUST be called holding the lock for the queue.
+func (sq *Queue) applyConf(conf configs.QueueConfig, silence bool) 
(*resources.Resource, error) {
        // Set the ACLs
        var err error
        sq.submitACL, err = security.NewACL(conf.SubmitACL, silence)
        if err != nil {
                log.Log(log.SchedQueue).Error("parsing submit ACL failed this 
should not happen",
                        zap.Error(err))
-               return err
+               return nil, err
        }
        sq.adminACL, err = security.NewACL(conf.AdminACL, silence)
        if err != nil {
                log.Log(log.SchedQueue).Error("parsing admin ACL failed this 
should not happen",
                        zap.Error(err))
-               return err
+               return nil, err
        }
        // Change from unmanaged to managed
        if !sq.isManaged {
@@ -392,85 +382,129 @@ func (sq *Queue) applyConf(conf configs.QueueConfig, 
silence bool) error {
 
        if !sq.isLeaf {
                if err = sq.setTemplate(conf.ChildTemplate); err != nil {
-                       return err
+                       return nil, err
                }
        }
 
+       oldMaxResource := sq.maxResource
        // Load the max & guaranteed resources and maxApps for all but the root 
queue
        if sq.Name != configs.RootQueue {
-               oldMaxResource := sq.maxResource
                if err = sq.setResourcesFromConf(conf.Resources); err != nil {
-                       return err
+                       return nil, err
                }
                sq.maxRunningApps = conf.MaxApplications
                sq.updateMaxRunningAppsMetrics()
-               sq.setPreemptionSettings(oldMaxResource, conf)
        }
-
        sq.properties = conf.Properties
-       return nil
+       return oldMaxResource, nil
 }
 
-// setPreemptionSettings Set Quota change preemption settings
-func (sq *Queue) setPreemptionSettings(oldMaxResource *resources.Resource, 
conf configs.QueueConfig) {
-       newMaxResource, err := resources.NewResourceFromConf(conf.Resources.Max)
-       if err != nil {
-               log.Log(log.SchedQueue).Error("parsing failed on max resources 
this should not happen",
-                       zap.String("queue", sq.QueuePath),
-                       zap.Error(err))
+// setPreemptionTime set the time the quota preemption should be triggered 
when a quota is changed.
+// Updates the time if the delay is set / changes and the preemption is not 
running yet.
+// This function MUST be called holding the lock for the queue.
+func (sq *Queue) setPreemptionTime(oldMaxResource *resources.Resource, 
oldDelay time.Duration) {
+       // if quota preemption is running nothing we do not have an influence
+       // if the delay for the queue is not set do not trigger preemption
+       if sq.isQuotaPreemptionRunning || sq.quotaPreemptionDelay == 0 {
                return
        }
-
-       switch {
-       // Set max res earlier but not now
-       case resources.IsZero(newMaxResource) && 
!resources.IsZero(oldMaxResource):
-               sq.quotaChangePreemptionDelay = 0
-               sq.quotaChangePreemptionStartTime = time.Time{}
-               // Set max res now but not earlier
-       case !resources.IsZero(newMaxResource) && 
resources.IsZero(oldMaxResource) && conf.Preemption.Delay != 0:
-               sq.quotaChangePreemptionDelay = conf.Preemption.Delay
-               sq.quotaChangePreemptionStartTime = 
time.Now().Add(time.Duration(int64(sq.quotaChangePreemptionDelay)) * 
time.Second) //nolint:gosec
-               // Set max res earlier and now as well
-       default:
-               switch {
-               // Quota decrease
-               case resources.StrictlyGreaterThan(oldMaxResource, 
newMaxResource) && conf.Preemption.Delay != 0:
-                       sq.quotaChangePreemptionDelay = conf.Preemption.Delay
-                       sq.quotaChangePreemptionStartTime = 
time.Now().Add(time.Duration(sq.quotaChangePreemptionDelay) * time.Second) 
//nolint:gosec
-                       // Quota increase
-               case resources.StrictlyGreaterThan(newMaxResource, 
oldMaxResource) && conf.Preemption.Delay != 0:
-                       sq.quotaChangePreemptionDelay = 0
-                       sq.quotaChangePreemptionStartTime = time.Time{}
-                       // Quota remains as is but delay has changed
-               case resources.Equals(oldMaxResource, newMaxResource) && 
conf.Preemption.Delay != 0 && sq.quotaChangePreemptionDelay != 
conf.Preemption.Delay:
-                       sq.quotaChangePreemptionDelay = conf.Preemption.Delay
-                       sq.quotaChangePreemptionStartTime = 
time.Now().Add(time.Duration(sq.quotaChangePreemptionDelay) * time.Second) 
//nolint:gosec
-               default:
-                       // noop
+       // if no current limit we should not preempt even if it was set 
earlier, clear the start time
+       if resources.IsZero(sq.maxResource) {
+               sq.quotaPreemptionStartTime = time.Time{}
+               log.Log(log.SchedQueue).Info("removed quota preemption start 
time",
+                       zap.String("queue", sq.QueuePath))
+               return
+       }
+       // adjust the startup based on the diff between the delays if no change
+       if resources.Equals(oldMaxResource, sq.maxResource) {
+               if sq.quotaPreemptionStartTime.IsZero() {
+                       // if a delay is set later than the quota change we 
would like to trigger preemption even if no change for
+                       // max resources is detected. preemption trigger will 
clean up if usage is below max already.
+                       if oldDelay == 0 && sq.quotaPreemptionDelay > 0 {
+                               sq.quotaPreemptionStartTime = 
time.Now().Add(sq.quotaPreemptionDelay)
+                               log.Log(log.SchedQueue).Info("set quota 
preemption start time based on delay change",
+                                       zap.String("queue", sq.QueuePath),
+                                       zap.Time("quotaPreemptionStartTime", 
sq.quotaPreemptionStartTime))
+                       }
+               } else {
+                       if oldDelay != sq.quotaPreemptionDelay {
+                               // apply the delta: this can be positive or 
negative does not matter. The resulting time could be in the
+                               // past: also not an issue.
+                               sq.quotaPreemptionStartTime = 
sq.quotaPreemptionStartTime.Add(sq.quotaPreemptionDelay - oldDelay)
+                               log.Log(log.SchedQueue).Info("updated quota 
preemption start time based on delay change",
+                                       zap.String("queue", sq.QueuePath),
+                                       zap.Time("quotaPreemptionStartTime", 
sq.quotaPreemptionStartTime),
+                                       zap.Duration("delta", 
sq.quotaPreemptionDelay-oldDelay))
+                       }
                }
+               return
+       }
+       // quota has been lowered, need to set the start time.
+       if resources.StrictlyGreaterThan(oldMaxResource, sq.maxResource) {
+               if !sq.quotaPreemptionStartTime.IsZero() {
+                       // multiple consecutive lowering of quotas detected: 
check for delay change
+                       if oldDelay != sq.quotaPreemptionDelay {
+                               sq.quotaPreemptionStartTime = 
sq.quotaPreemptionStartTime.Add(sq.quotaPreemptionDelay - oldDelay)
+                               log.Log(log.SchedQueue).Info("consecutive 
lowering of quota with delay change",
+                                       zap.String("queue", sq.QueuePath),
+                                       zap.Time("quotaPreemptionStartTime", 
sq.quotaPreemptionStartTime),
+                                       zap.Duration("delta", 
sq.quotaPreemptionDelay-oldDelay))
+                               return
+                       }
+                       log.Log(log.SchedQueue).Info("consecutive lowering of 
quota: preemption time already set from earlier quota change",
+                               zap.String("queue", sq.QueuePath),
+                               zap.Time("quotaPreemptionStartTime", 
sq.quotaPreemptionStartTime))
+                       return
+               }
+               sq.quotaPreemptionStartTime = 
time.Now().Add(sq.quotaPreemptionDelay)
+               log.Log(log.SchedQueue).Info("Setting quota preemption time",
+                       zap.String("queue", sq.QueuePath),
+                       zap.Stringer("old maxResource", oldMaxResource),
+                       zap.Stringer("maxResources", sq.maxResource),
+                       zap.Duration("delay", sq.quotaPreemptionDelay),
+                       zap.Time("quotaPreemptionStartTime", 
sq.quotaPreemptionStartTime))
        }
 }
 
-// resetPreemptionSettings Reset Quota change preemption settings
-func (sq *Queue) resetPreemptionSettings() {
-       sq.Lock()
-       defer sq.Unlock()
-       sq.quotaChangePreemptionDelay = 0
-       sq.quotaChangePreemptionStartTime = time.Time{}
-}
-
-// shouldTriggerPreemption Should preemption be triggered or not to enforce 
new max quota?
+// shouldTriggerPreemption returns true if quota preemption should be 
triggered based on the settings and the
+// current time.
 func (sq *Queue) shouldTriggerPreemption() bool {
        sq.RLock()
        defer sq.RUnlock()
-       return sq.quotaChangePreemptionDelay != 0 && 
!sq.quotaChangePreemptionStartTime.IsZero() && 
time.Now().After(sq.quotaChangePreemptionStartTime)
+       // dynamic queues do not support quota preemption
+       if !sq.isManaged {
+               return false
+       }
+       // already in progress do not run again
+       if sq.isQuotaPreemptionRunning {
+               return false
+       }
+       // usage is below max: no need to trigger. Happens if the queue drops 
below the new max when pods stop.
+       // Should clean up, but we have just a read lock...
+       if 
sq.maxResource.StrictlyGreaterThanOrEqualsOnlyExisting(sq.allocatedResource) {
+               sq.quotaPreemptionStartTime = time.Time{}
+               return false
+       }
+       // trigger if the time is right
+       return !sq.quotaPreemptionStartTime.IsZero() && 
time.Now().After(sq.quotaPreemptionStartTime)
 }
 
-// getPreemptionSettings Get preemption settings. Only for testing
-func (sq *Queue) getPreemptionSettings() (uint64, time.Time) {
+// setQuotaPreemptionState set or clear the running state for quota 
preemption. When done the start time is also cleared
+// out preventing a re-run based on the same change
+func (sq *Queue) setQuotaPreemptionState(run bool) {
+       sq.Lock()
+       defer sq.Unlock()
+       if !run {
+               sq.quotaPreemptionStartTime = time.Time{}
+       }
+       sq.isQuotaPreemptionRunning = run
+}
+
+// getQuotaPreemptionRunning returns the running state for quota preemption.
+func (sq *Queue) getQuotaPreemptionRunning() bool {
        sq.RLock()
        defer sq.RUnlock()
-       return sq.quotaChangePreemptionDelay, sq.quotaChangePreemptionStartTime
+       return sq.isQuotaPreemptionRunning
 }
 
 // setResourcesFromConf sets the maxResource and guaranteedResource of the 
queue from the config.
@@ -578,7 +612,7 @@ func (sq *Queue) setTemplate(conf configs.ChildTemplate) 
error {
 }
 
 // UpdateQueueProperties updates the queue properties defined as text
-func (sq *Queue) UpdateQueueProperties() {
+func (sq *Queue) UpdateQueueProperties(oldMaxResource *resources.Resource) {
        sq.Lock()
        defer sq.Unlock()
        if common.IsRecoveryQueue(sq.QueuePath) {
@@ -632,26 +666,32 @@ func (sq *Queue) UpdateQueueProperties() {
                        }
                case configs.PreemptionDelay:
                        if sq.isLeaf {
-                               sq.preemptionDelay, err = preemptionDelay(value)
+                               sq.preemptionDelay, err = convertDelay(value, 
configs.DefaultPreemptionDelay)
                                if err != nil {
                                        
log.Log(log.SchedQueue).Debug("preemption delay property configuration error",
                                                zap.Error(err))
                                }
                        }
                case configs.ApplicationUnschedulableAsksBackoff:
-                       unschedAskBackoff, err := unschedulableAskBackoff(value)
+                       sq.unschedAskBackoff, err = 
unschedulableAskBackoff(value)
                        if err != nil {
                                log.Log(log.SchedQueue).Debug("unschedulable 
ask backoff configuration error",
                                        zap.Error(err))
                        }
-                       sq.unschedAskBackoff = unschedAskBackoff
                case configs.ApplicationUnschedulableAsksBackoffDelay:
-                       askBackoffDelay, err := backoffDelay(value)
+                       sq.askBackoffDelay, err = convertDelay(value, 
configs.DefaultAskBackOffDelay)
                        if err != nil {
                                log.Log(log.SchedQueue).Debug("unschedulable 
ask backoff delay configuration error",
                                        zap.Error(err))
                        }
-                       sq.askBackoffDelay = askBackoffDelay
+               case configs.QuotaPreemptionDelay:
+                       oldDelay := sq.quotaPreemptionDelay
+                       sq.quotaPreemptionDelay, err = convertDelay(value, 
configs.DefaultQuotaPreemptionDelay)
+                       if err != nil {
+                               log.Log(log.SchedQueue).Debug("quota preemption 
delay configuration error",
+                                       zap.Error(err))
+                       }
+                       sq.setPreemptionTime(oldMaxResource, oldDelay)
                default:
                        // skip unknown properties just log them
                        log.Log(log.SchedQueue).Debug("queue property skipped",
@@ -725,7 +765,7 @@ func (sq *Queue) GetPreemptingResource() 
*resources.Resource {
 func (sq *Queue) GetGuaranteedResource() *resources.Resource {
        sq.RLock()
        defer sq.RUnlock()
-       return sq.guaranteedResource
+       return sq.guaranteedResource.Clone()
 }
 
 // GetMaxApps returns the maximum number of applications that can run in this 
queue.
@@ -735,7 +775,7 @@ func (sq *Queue) GetMaxApps() uint64 {
        return sq.maxRunningApps
 }
 
-// GetActualGuaranteedResources returns the actual (including parent) 
guaranteed resources for the queue.
+// GetActualGuaranteedResource returns the actual (including parent) 
guaranteed resources for the queue.
 func (sq *Queue) GetActualGuaranteedResource() *resources.Resource {
        if sq == nil {
                return resources.NewResource()
@@ -817,6 +857,7 @@ func (sq *Queue) GetPartitionQueueDAOInfo(include bool) 
dao.PartitionQueueDAOInf
        queueInfo.PreemptionEnabled = sq.preemptionPolicy != 
policies.DisabledPreemptionPolicy
        queueInfo.IsPreemptionFence = sq.preemptionPolicy == 
policies.FencePreemptionPolicy
        queueInfo.PreemptionDelay = sq.preemptionDelay.String()
+       queueInfo.QuotaPreemptionDelay = sq.quotaPreemptionDelay.String()
        queueInfo.IsPriorityFence = sq.priorityPolicy == 
policies.FencePriorityPolicy
        queueInfo.PriorityOffset = sq.priorityOffset
        queueInfo.Properties = make(map[string]string)
@@ -1402,7 +1443,7 @@ func (sq *Queue) GetMaxResource() *resources.Resource {
        return sq.internalGetMax(limit)
 }
 
-func (sq *Queue) CloneMaxResource() *resources.Resource {
+func (sq *Queue) cloneMaxResource() *resources.Resource {
        sq.RLock()
        defer sq.RUnlock()
        return sq.maxResource.Clone()
@@ -1540,7 +1581,25 @@ func (sq *Queue) canRunApp(appID string) bool {
 // resources are skipped.
 // Applications are sorted based on the application sortPolicy. Applications 
without pending resources are skipped.
 // Lock free call this all locks are taken when needed in called functions
-func (sq *Queue) TryAllocate(iterator func() NodeIterator, fullIterator func() 
NodeIterator, getnode func(string) *Node, allowPreemption bool) 
*AllocationResult {
+func (sq *Queue) TryAllocate(iterator func() NodeIterator, fullIterator func() 
NodeIterator, getnode func(string) *Node, allowPreemption, quotaPreemption 
bool) *AllocationResult {
+       if quotaPreemption && sq.shouldTriggerPreemption() {
+               go func() {
+                       log.Log(log.SchedQueue).Info("Preconditions has passed 
trigger preemption to enforce new max resources",
+                               zap.String("queueName", sq.GetQueuePath()),
+                               zap.Stringer("maxResource", 
sq.cloneMaxResource()))
+                       preemptor := NewQuotaPreemptor(sq)
+                       preemptor.tryPreemption()
+               }()
+               // when we trigger for a parent queue do not trigger for the 
children just yet. At least wait until the next
+               // scheduling cycle.
+               quotaPreemption = false
+       }
+       // if quota preemption is running for this queue we do not want to 
trigger for any of the children.
+       // we do a top-down approach: parent first and when done we check the 
children
+       // there could be a child quota preemption running already
+       if sq.getQuotaPreemptionRunning() {
+               quotaPreemption = false
+       }
        if sq.IsLeafQueue() {
                // get the headroom
                headRoom := sq.getHeadRoom()
@@ -1574,26 +1633,10 @@ func (sq *Queue) TryAllocate(iterator func() 
NodeIterator, fullIterator func() N
                                return result
                        }
                }
-
-               // Should we trigger preemption to enforce new quota?
-               if sq.shouldTriggerPreemption() {
-                       go func() {
-                               log.Log(log.SchedQueue).Info("Trigger 
preemption to enforce new max resources",
-                                       zap.String("queueName", sq.QueuePath),
-                                       zap.String("max resources", 
sq.maxResource.String()))
-                               preemptor := NewQuotaChangePreemptor(sq)
-                               if preemptor.CheckPreconditions() {
-                                       
log.Log(log.SchedQueue).Info("Preconditions has passed to trigger preemption to 
enforce new max resources",
-                                               zap.String("queueName", 
sq.QueuePath),
-                                               zap.String("max resources", 
sq.maxResource.String()))
-                                       preemptor.tryPreemption()
-                               }
-                       }()
-               }
        } else {
                // process the child queues (filters out queues without pending 
requests)
                for _, child := range sq.sortQueues() {
-                       result := child.TryAllocate(iterator, fullIterator, 
getnode, allowPreemption)
+                       result := child.TryAllocate(iterator, fullIterator, 
getnode, allowPreemption, quotaPreemption)
                        if result != nil {
                                return result
                        }
@@ -2190,47 +2233,6 @@ func (sq *Queue) recalculatePriority() int32 {
        return priorityValueByPolicy(sq.priorityPolicy, sq.priorityOffset, curr)
 }
 
-func (sq *Queue) MarkQuotaChangePreemptionRunning(run bool) {
-       sq.Lock()
-       defer sq.Unlock()
-       sq.isQuotaChangePreemptionRunning = run
-}
-
-func (sq *Queue) isQCPreemptionRunningForParent() bool {
-       if sq == nil {
-               return false
-       }
-       if sq.parent != nil {
-               if sq.parent.isQCPreemptionRunningForParent() {
-                       return true
-               }
-       }
-       sq.RLock()
-       defer sq.RUnlock()
-       return sq.isQuotaChangePreemptionRunning
-}
-
-func (sq *Queue) isQCPreemptionRunningForChild() bool {
-       for _, child := range sq.GetCopyOfChildren() {
-               if child.isQCPreemptionRunningForChild() {
-                       return true
-               }
-       }
-       sq.RLock()
-       defer sq.RUnlock()
-       return sq.isQuotaChangePreemptionRunning
-}
-
-func (sq *Queue) IsQCPreemptionRunning() bool {
-       if sq.isQCPreemptionRunningForParent() {
-               return true
-       }
-       if sq.isQCPreemptionRunningForChild() {
-               return true
-       }
-       return false
-}
-
 func (sq *Queue) GetMaxAppUnschedAskBackoff() uint64 {
        sq.RLock()
        defer sq.RUnlock()
diff --git a/pkg/scheduler/objects/queue_test.go 
b/pkg/scheduler/objects/queue_test.go
index 4308749a..399e8581 100644
--- a/pkg/scheduler/objects/queue_test.go
+++ b/pkg/scheduler/objects/queue_test.go
@@ -43,8 +43,6 @@ import (
        "github.com/apache/yunikorn-scheduler-interface/lib/go/si"
 )
 
-const ZeroResource string = "{\"resources\":{\"first\":{\"value\":0}}}"
-
 // base test for creating a managed queue
 func TestQueueBasics(t *testing.T) {
        // create the root
@@ -1794,7 +1792,7 @@ func TestGetPartitionQueueDAOInfo(t *testing.T) {
                configs.PreemptionDelay:       "3600s",
                configs.PreemptionPolicy:      
policies.FencePreemptionPolicy.String(),
        }
-       leaf.UpdateQueueProperties()
+       leaf.UpdateQueueProperties(nil)
        leafDAO := leaf.GetPartitionQueueDAOInfo(false)
        assert.Equal(t, leafDAO.QueueName, "root.leaf-queue")
        assert.Equal(t, len(leafDAO.Children), 0, "leaf has no children")
@@ -1810,7 +1808,7 @@ func TestGetPartitionQueueDAOInfo(t *testing.T) {
                configs.PreemptionDelay:       "10s",
                configs.PreemptionPolicy:      
policies.DisabledPreemptionPolicy.String(),
        }
-       leaf.UpdateQueueProperties()
+       leaf.UpdateQueueProperties(nil)
        leafDAO = leaf.GetPartitionQueueDAOInfo(false)
        assert.Equal(t, leafDAO.PreemptionEnabled, false, "preemption should 
not be enabled")
        assert.Equal(t, leafDAO.IsPreemptionFence, false, "queue should not be 
a fence")
@@ -2230,14 +2228,14 @@ func TestApplyConf(t *testing.T) {
        errConf1 := configs.QueueConfig{
                SubmitACL: "error Submit ACL",
        }
-       err = errQueue.ApplyConf(errConf1)
+       _, err = errQueue.ApplyConf(errConf1)
        assert.ErrorContains(t, err, "multiple spaces found in ACL")
 
        // wrong AdminACL
        errConf2 := configs.QueueConfig{
                AdminACL: "error Admin ACL",
        }
-       err = errQueue.ApplyConf(errConf2)
+       _, err = errQueue.ApplyConf(errConf2)
        assert.ErrorContains(t, err, "multiple spaces found in ACL")
 
        // wrong ChildTemplate
@@ -2249,7 +2247,7 @@ func TestApplyConf(t *testing.T) {
                        },
                },
        }
-       err = errQueue.ApplyConf(errConf3)
+       _, err = errQueue.ApplyConf(errConf3)
        assert.ErrorContains(t, err, "invalid quantity")
 
        // isManaged is changed from unmanaged to managed
@@ -2261,7 +2259,7 @@ func TestApplyConf(t *testing.T) {
        child, err := NewDynamicQueue("child", true, parent, nil)
        assert.NilError(t, err, "failed to create basic queue: %v", err)
 
-       err = child.ApplyConf(childConf)
+       _, err = child.ApplyConf(childConf)
        assert.NilError(t, err, "failed to parse conf: %v", err)
        assert.Equal(t, child.IsManaged(), true)
 
@@ -2278,7 +2276,7 @@ func TestApplyConf(t *testing.T) {
        parent, err = createManagedQueueWithProps(nil, "parent", false, nil, 
nil)
        assert.NilError(t, err, "failed to create basic queue: %v", err)
 
-       err = parent.ApplyConf(parentConf)
+       _, err = parent.ApplyConf(parentConf)
        assert.NilError(t, err, "failed to parse conf: %v", err)
        assert.Equal(t, parent.IsLeafQueue(), false)
 
@@ -2308,7 +2306,7 @@ func TestApplyConf(t *testing.T) {
        assert.Assert(t, validTemplate != nil)
 
        conf.Parent = false
-       err = leaf.ApplyConf(conf)
+       _, err = leaf.ApplyConf(conf)
        assert.NilError(t, err, "failed to apply conf: %v", err)
        assert.Assert(t, leaf.template == nil)
        assert.Assert(t, leaf.maxResource != nil)
@@ -2320,7 +2318,7 @@ func TestApplyConf(t *testing.T) {
        assert.NilError(t, err, "failed to create basic queue: %v", err)
 
        conf.Parent = true
-       err = queue.ApplyConf(conf)
+       _, err = queue.ApplyConf(conf)
        assert.NilError(t, err, "failed to apply conf: %v", err)
        assert.Assert(t, queue.template != nil)
        assert.Assert(t, queue.maxResource != nil)
@@ -2331,142 +2329,193 @@ func TestApplyConf(t *testing.T) {
        root, err := createManagedQueueWithProps(nil, "root", true, nil, nil)
        assert.NilError(t, err, "failed to create basic queue: %v", err)
 
-       err = queue.ApplyConf(conf)
+       _, err = queue.ApplyConf(conf)
        assert.NilError(t, err, "failed to apply conf: %v", err)
        assert.Assert(t, root.maxResource == nil)
        assert.Assert(t, root.guaranteedResource == nil)
        assert.Equal(t, root.maxRunningApps, uint64(0))
 }
 
-func TestQuotaChangePreemptionSettings(t *testing.T) {
-       root, err := createManagedQueueWithProps(nil, "root", true, nil, nil)
+func TestQuotaPreemptionSettings(t *testing.T) {
+       root, err := createManagedQueue(nil, "root", true, nil)
        assert.NilError(t, err, "failed to create basic queue: %v", err)
 
-       parent, err := createManagedQueueWithProps(root, "parent", false, nil, 
nil)
-       assert.NilError(t, err, "failed to create basic queue: %v", err)
        testCases := []struct {
-               name          string
-               conf          configs.QueueConfig
-               expectedDelay uint64
-       }{{"first time queue setup without delay", configs.QueueConfig{
-               Resources: configs.Resources{
-                       Max:        getResourceConf(),
-                       Guaranteed: getResourceConf(),
-               },
-       }, 0},
-               {"clearing max resources", configs.QueueConfig{
-                       Resources: configs.Resources{
-                               Max:        nil,
-                               Guaranteed: nil,
-                       },
-               }, 0},
-               {"first time queue setup with delay", configs.QueueConfig{
-                       Resources: configs.Resources{
-                               Max:        getResourceConf(),
-                               Guaranteed: getResourceConf(),
-                       },
-                       Preemption: configs.Preemption{
-                               Delay: 1,
-                       },
-               }, 1},
-               {"increase max with delay", configs.QueueConfig{
-                       Resources: configs.Resources{
-                               Max: map[string]string{"memory": "100000000"},
-                       },
-                       Preemption: configs.Preemption{
-                               Delay: 500,
-                       },
-               }, 0},
-               {"decrease max with delay", configs.QueueConfig{
-                       Resources: configs.Resources{
-                               Max: map[string]string{"memory": "100"},
-                       },
-                       Preemption: configs.Preemption{
-                               Delay: 2,
-                       },
-               }, 2},
-               {"max remains as is but delay changed", configs.QueueConfig{
-                       Resources: configs.Resources{
-                               Max: map[string]string{"memory": "100"},
-                       },
-                       Preemption: configs.Preemption{
-                               Delay: 2,
-                       },
-               }, 2},
-               {"unrelated config change, should not impact earlier set 
preemption settings", configs.QueueConfig{
-                       Resources: configs.Resources{
-                               Max:        map[string]string{"memory": "100"},
-                               Guaranteed: map[string]string{"memory": "50"},
-                       },
-                       Preemption: configs.Preemption{
-                               Delay: 2,
-                       },
-               }, 2},
-               {"increase max again with delay", configs.QueueConfig{
-                       Resources: configs.Resources{
-                               Max: map[string]string{"memory": "101"},
-                       },
-                       Preemption: configs.Preemption{
-                               Delay: 200,
-                       },
-               }, 0}}
-
+               name       string
+               maxRes     map[string]string
+               conf       configs.QueueConfig
+               oldDelay   time.Duration
+               timeChange bool
+       }{
+               {"clearing max",
+                       map[string]string{"memory": "500"},
+                       configs.QueueConfig{
+                               Resources: configs.Resources{
+                                       Max: nil,
+                               },
+                       }, 0, false},
+               {"clearing max with delay",
+                       map[string]string{"memory": "500"},
+                       configs.QueueConfig{
+                               Resources: configs.Resources{
+                                       Max: nil,
+                               },
+                               Properties: 
map[string]string{configs.QuotaPreemptionDelay: "50ms"},
+                       }, 0, false},
+               {"incorrect delay",
+                       map[string]string{"memory": "500"},
+                       configs.QueueConfig{
+                               Resources: configs.Resources{
+                                       Max: nil,
+                               },
+                               Properties: 
map[string]string{configs.QuotaPreemptionDelay: "-50s"},
+                       }, 0, false},
+               {"increase max with delay",
+                       map[string]string{"memory": "500"},
+                       configs.QueueConfig{
+                               Resources: configs.Resources{
+                                       Max: map[string]string{"memory": 
"1000"},
+                               },
+                               Properties: 
map[string]string{configs.QuotaPreemptionDelay: "50ms"},
+                       }, 50 * time.Millisecond, false},
+               {"decrease max with delay",
+                       map[string]string{"memory": "500"},
+                       configs.QueueConfig{
+                               Resources: configs.Resources{
+                                       Max: map[string]string{"memory": "100"},
+                               },
+                               Properties: 
map[string]string{configs.QuotaPreemptionDelay: "50ms"},
+                       }, 50 * time.Millisecond, true},
+               {"delay changed from 0 no max change",
+                       map[string]string{"memory": "500"},
+                       configs.QueueConfig{
+                               Resources: configs.Resources{
+                                       Max: map[string]string{"memory": "500"},
+                               },
+                               Properties: 
map[string]string{configs.QuotaPreemptionDelay: "50ms"},
+                       }, 0, true},
+       }
+
+       var oldMax *resources.Resource
+       var parent *Queue
        for _, tc := range testCases {
                t.Run(tc.name, func(t *testing.T) {
-                       err = parent.ApplyConf(tc.conf)
+                       var expectedMax *resources.Resource
+                       if tc.maxRes != nil {
+                               expectedMax, err = 
resources.NewResourceFromConf(tc.maxRes)
+                               assert.NilError(t, err, "resource creation 
failed")
+                       }
+                       parent, err = createManagedQueue(root, "parent", false, 
tc.maxRes)
+                       assert.NilError(t, err, "failed to create basic queue: 
%v", err)
+                       oldMax, err = parent.ApplyConf(tc.conf)
                        assert.NilError(t, err, "failed to apply conf: %v", err)
+                       assert.Assert(t, resources.Equals(oldMax, expectedMax), 
"old max not from setup")
 
-                       // assert the preemption settings
-                       delay, sTime := parent.getPreemptionSettings()
-                       assert.Equal(t, delay, tc.expectedDelay)
-                       if tc.expectedDelay != uint64(0) {
-                               assert.Equal(t, sTime.IsZero(), false)
-                       } else {
-                               assert.Equal(t, sTime.IsZero(), true)
-                       }
+                       // apply properties and set a usage
+                       parent.allocatedResource = resources.Multiply(oldMax, 2)
+                       parent.quotaPreemptionDelay = tc.oldDelay
+                       parent.UpdateQueueProperties(oldMax)
+                       // Wait till delay expires to let trigger preemption 
automatically
+                       time.Sleep(parent.quotaPreemptionDelay + 
50*time.Millisecond)
+                       assert.Equal(t, parent.shouldTriggerPreemption(), 
tc.timeChange, "preemption should get trigger for set delay")
+                       parent.TryAllocate(nil, nil, nil, false, true)
+
+                       time.Sleep(50 * time.Millisecond)
+
+                       // since preemption settings are reset, preemption 
should not be triggerred again during the next check
                        assert.Equal(t, parent.shouldTriggerPreemption(), false)
+               })
+       }
+}
 
-                       used, err := 
resources.NewResourceFromConf(tc.conf.Resources.Max)
-                       assert.NilError(t, err, "failed to set allocated 
resource: %v", err)
-                       parent.allocatedResource = resources.Multiply(used, 2)
+func TestShouldTriggerPreemption(t *testing.T) {
+       parentConfig := configs.QueueConfig{
+               Name:   "parent",
+               Parent: true,
+               Resources: configs.Resources{
+                       Max: map[string]string{"memory": "1000"},
+               },
+       }
+       parent, err := NewConfiguredQueue(parentConfig, nil, false, nil)
+       assert.NilError(t, err)
+       parent.allocatedResource = 
resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 2000, 
"cpu": 2000})
+       parent.quotaPreemptionStartTime = time.Now()
 
-                       // Wait till delay expires to let trigger preemption 
automatically
-                       time.Sleep(time.Duration(int64(tc.expectedDelay)+1) * 
time.Second)
-                       if tc.expectedDelay != uint64(0) {
-                               assert.Equal(t, 
parent.shouldTriggerPreemption(), true)
-                       }
-                       parent.TryAllocate(nil, nil, nil, false)
-
-                       // preemption settings should be same as before even 
now as trigger is async process
-                       delay, sTime = parent.getPreemptionSettings()
-                       assert.Equal(t, delay, tc.expectedDelay)
-                       if tc.expectedDelay != uint64(0) {
-                               assert.Equal(t, sTime.IsZero(), false)
-                               assert.Equal(t, 
parent.shouldTriggerPreemption(), true)
-                       } else {
-                               assert.Equal(t, sTime.IsZero(), true)
-                       }
+       leafRes := configs.Resources{
+               Max: map[string]string{"memory": "1000"},
+       }
+       leaf, err := NewConfiguredQueue(configs.QueueConfig{
+               Name:      "leaf",
+               Resources: leafRes,
+       }, parent, false, nil)
+       assert.NilError(t, err)
+
+       dynamicLeaf, err := NewConfiguredQueue(configs.QueueConfig{
+               Name:      "dynamic-leaf",
+               Resources: leafRes,
+       }, parent, false, nil)
+       assert.NilError(t, err)
+       dynamicLeaf.isManaged = false
 
-                       time.Sleep(time.Millisecond * 100)
+       alreadyPreemptionRunning, err := NewConfiguredQueue(configs.QueueConfig{
+               Name:      "leaf-already-preemption-running",
+               Resources: leafRes,
+       }, parent, false, nil)
+       assert.NilError(t, err)
+       alreadyPreemptionRunning.setQuotaPreemptionState(true)
 
-                       // preemption should have been triggered by now, assert 
preemption settings to ensure values are reset
-                       if tc.expectedDelay != uint64(0) {
-                               delay, sTime = parent.getPreemptionSettings()
-                               assert.Equal(t, sTime.IsZero(), true)
-                               assert.Equal(t, delay, uint64(0))
+       usageExceededMaxQueue, err := NewConfiguredQueue(configs.QueueConfig{
+               Name:      "leaf-usage-exceeded-max",
+               Resources: leafRes,
+       }, parent, false, nil)
+       assert.NilError(t, err)
+       usageExceededMaxQueue.allocatedResource = 
resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 2000, 
"cpu": 2000})
 
-                               // since preemption settings are set, 
preemption should not be triggerred again during tryAllocate
-                               assert.Equal(t, 
parent.shouldTriggerPreemption(), false)
-                       }
+       usageEqualsMaxQueue, err := NewConfiguredQueue(configs.QueueConfig{
+               Name:      "leaf-usage-equals-max",
+               Resources: leafRes,
+       }, parent, false, nil)
+       assert.NilError(t, err)
+       usageEqualsMaxQueue.allocatedResource = 
resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 1000, 
"cpu": 1000})
+
+       usageNotMatchingMaxQueue, err := NewConfiguredQueue(configs.QueueConfig{
+               Name: "leaf-usage-res-not-matching-max-res",
+               Resources: configs.Resources{
+                       Max: map[string]string{"cpu": "1000"},
+               },
+       }, parent, false, nil)
+       assert.NilError(t, err)
+       usageNotMatchingMaxQueue.allocatedResource = 
resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 1000})
+
+       testCases := []struct {
+               name               string
+               queue              *Queue
+               preconditionResult bool
+       }{
+               {"no usage or max change", leaf, false},
+               {"dynamic queue", dynamicLeaf, false},
+               {"preemption running", alreadyPreemptionRunning, false},
+               {"usage exceeded max, no start time", usageExceededMaxQueue, 
false},
+               {"usage exceeded max, start time set", parent, true},
+               {"usage equals max resources", usageEqualsMaxQueue, false},
+               {"usage res not matching max", usageNotMatchingMaxQueue, false},
+       }
+       for _, tc := range testCases {
+               t.Run(tc.name, func(t *testing.T) {
+                       assert.Equal(t, tc.queue.shouldTriggerPreemption(), 
tc.preconditionResult)
                })
        }
+       // special case: reset time if usage below max
+       usageNotMatchingMaxQueue.quotaPreemptionStartTime = 
time.Now().Add(time.Hour)
+       assert.Assert(t, !usageNotMatchingMaxQueue.shouldTriggerPreemption(), 
"preemption should not be triggered")
+       assert.Assert(t, 
usageNotMatchingMaxQueue.quotaPreemptionStartTime.IsZero(), "start time should 
be reset")
 }
 
 func TestNewConfiguredQueue(t *testing.T) {
        // check variable assignment
        properties := getProperties()
        resourceConf := getResourceConf()
-       // turn resouce config into resource struct
+       // turn resource config into resource struct
        resourceStruct, err := resources.NewResourceFromConf(resourceConf)
        assert.NilError(t, err, "failed to create new resource from config: 
%v", err)
 
@@ -2491,20 +2540,19 @@ func TestNewConfiguredQueue(t *testing.T) {
        assert.DeepEqual(t, properties, parent.template.GetProperties())
        assert.Assert(t, resources.Equals(resourceStruct, 
parent.template.GetMaxResource()))
        assert.Assert(t, resources.Equals(resourceStruct, 
parent.template.GetGuaranteedResource()))
-       assert.Equal(t, parent.quotaChangePreemptionDelay, uint64(0))
+       assert.Equal(t, parent.preemptionDelay, configs.DefaultPreemptionDelay)
+       assert.Equal(t, parent.quotaPreemptionDelay, 
configs.DefaultQuotaPreemptionDelay)
 
        // case 0: managed leaf queue can't use template
        leafConfig := configs.QueueConfig{
-               Name:       "leaf_queue",
-               Parent:     false,
-               Properties: getProperties(),
+               Name:   "leaf_queue",
+               Parent: false,
+               Properties: map[string]string{configs.PreemptionDelay: "10s",
+                       configs.QuotaPreemptionDelay: "1h"},
                Resources: configs.Resources{
                        Max:        getResourceConf(),
                        Guaranteed: getResourceConf(),
                },
-               Preemption: configs.Preemption{
-                       Delay: 500,
-               },
        }
        childLeaf, err := NewConfiguredQueue(leafConfig, parent, false, nil)
        assert.NilError(t, err, "failed to create queue: %v", err)
@@ -2517,15 +2565,13 @@ func TestNewConfiguredQueue(t *testing.T) {
        childLeafGuaranteed, err := 
resources.NewResourceFromConf(leafConfig.Resources.Guaranteed)
        assert.NilError(t, err, "Resource creation failed")
        assert.Assert(t, resources.Equals(childLeaf.guaranteedResource, 
childLeafGuaranteed))
-       assert.Equal(t, childLeaf.quotaChangePreemptionDelay, uint64(500))
+       assert.Equal(t, childLeaf.preemptionDelay, 10*time.Second)
+       assert.Equal(t, childLeaf.quotaPreemptionDelay, 1*time.Hour)
 
-       // case 1: non-leaf can't use template but it can inherit template from 
parent
+       // case 1: non-leaf can't use template, but it can inherit template 
from parent
        NonLeafConfig := configs.QueueConfig{
                Name:   "nonleaf_queue",
                Parent: true,
-               Preemption: configs.Preemption{
-                       Delay: 500,
-               },
        }
        childNonLeaf, err := NewConfiguredQueue(NonLeafConfig, parent, false, 
nil)
        assert.NilError(t, err, "failed to create queue: %v", err)
@@ -2534,7 +2580,6 @@ func TestNewConfiguredQueue(t *testing.T) {
        assert.Equal(t, len(childNonLeaf.properties), 0)
        assert.Assert(t, childNonLeaf.guaranteedResource == nil)
        assert.Assert(t, childNonLeaf.maxResource == nil)
-       assert.Equal(t, childNonLeaf.quotaChangePreemptionDelay, uint64(0))
 
        // case 2: do not send queue event when silence flag is set to true
        events.Init()
@@ -2542,17 +2587,12 @@ func TestNewConfiguredQueue(t *testing.T) {
        eventSystem.StartServiceWithPublisher(false)
        rootConfig := configs.QueueConfig{
                Name: "root",
-               Preemption: configs.Preemption{
-                       Delay: 500,
-               },
        }
-
-       rootQ, err := NewConfiguredQueue(rootConfig, nil, true, nil)
+       _, err = NewConfiguredQueue(rootConfig, nil, true, nil)
        assert.NilError(t, err, "failed to create queue: %v", err)
        time.Sleep(time.Second)
        noEvents := eventSystem.Store.CountStoredEvents()
        assert.Equal(t, noEvents, uint64(0), "expected 0 event, got %d", 
noEvents)
-       assert.Equal(t, rootQ.quotaChangePreemptionDelay, uint64(0))
 }
 
 func TestResetRunningState(t *testing.T) {
@@ -2575,11 +2615,11 @@ func TestResetRunningState(t *testing.T) {
        parent.MarkQueueForRemoval()
        assert.Assert(t, parent.IsDraining(), "parent should be marked as 
draining")
        assert.Assert(t, leaf.IsDraining(), "leaf should be marked as draining")
-       err = parent.applyConf(emptyConf, false)
+       _, err = parent.applyConf(emptyConf, false)
        assert.NilError(t, err, "failed to update parent")
        assert.Assert(t, parent.IsRunning(), "parent should be running again")
        assert.Assert(t, leaf.IsDraining(), "leaf should still be marked as 
draining")
-       err = leaf.applyConf(emptyConf, false)
+       _, err = leaf.applyConf(emptyConf, false)
        assert.NilError(t, err, "failed to update leaf")
        assert.Assert(t, leaf.IsRunning(), "leaf should be running again")
 }
@@ -2910,8 +2950,8 @@ func TestQueueEvents(t *testing.T) {
        eventSystem := events.GetEventSystem().(*events.EventSystemImpl) 
//nolint:errcheck
        eventSystem.StartServiceWithPublisher(false)
        queue, err := createRootQueue(nil)
-       queue.Name = "testQueue"
        assert.NilError(t, err)
+       queue.Name = "testQueue"
 
        app := newApplication(appID0, "default", "root")
        queue.AddApplication(app)
@@ -2947,8 +2987,10 @@ func TestQueueEvents(t *testing.T) {
                        },
                },
        }
-       err = queue.ApplyConf(newConf)
+       var oldMax *resources.Resource
+       oldMax, err = queue.ApplyConf(newConf)
        assert.NilError(t, err)
+       assert.Assert(t, oldMax.IsEmpty())
        err = common.WaitForCondition(10*time.Millisecond, time.Second, func() 
bool {
                noEvents = eventSystem.Store.CountStoredEvents()
                return noEvents == 3
@@ -3144,55 +3186,42 @@ func TestQueueBackoffProperties(t *testing.T) {
        assert.Equal(t, 30*time.Second, leaf3.GetBackoffDelay())
 }
 
-func TestQueue_IsQCPreemptionRunning(t *testing.T) {
-       // create the root
-       root, err := createManagedQueueMaxApps(nil, "root", true, nil, 1)
-       assert.NilError(t, err, "queue create failed")
-       parent, err := createManagedQueue(root, "parent", true, nil)
-       assert.NilError(t, err, "failed to create parent queue")
-
-       var leaf, leaf2, leaf11, leaf111 *Queue
-       leaf, err = createManagedQueue(parent, "leaf", true, nil)
-       assert.NilError(t, err, "failed to create leaf queue")
-       leaf2, err = createManagedQueue(parent, "leaf2", false, nil)
-       assert.NilError(t, err, "failed to create leaf2 queue")
-
-       leaf11, err = createManagedQueue(leaf, "leaf11", true, nil)
-       assert.NilError(t, err, "failed to create leaf11 queue")
-
-       leaf111, err = createManagedQueue(leaf11, "leaf111", false, nil)
-       assert.NilError(t, err, "failed to create leaf111 queue")
-
-       // root.parent is running. any queue located in this hierarchy (both 
upwards and downwards) should return true. All branches of parent should return 
true.
-       parent.isQuotaChangePreemptionRunning = true
-       assert.Equal(t, parent.IsQCPreemptionRunning(), true)
-       assert.Equal(t, root.IsQCPreemptionRunning(), true)
-       assert.Equal(t, leaf111.IsQCPreemptionRunning(), true)
-       assert.Equal(t, leaf11.IsQCPreemptionRunning(), true)
-       assert.Equal(t, leaf.IsQCPreemptionRunning(), true)
-       assert.Equal(t, leaf2.IsQCPreemptionRunning(), true)
-
-       // reset
-       parent.isQuotaChangePreemptionRunning = false
-
-       // root.parent.leaf111 (leaf queue) is running. any queue located in 
this hierarchy (upwards) should return true. Other branches of parent should 
return false.
-       leaf111.isQuotaChangePreemptionRunning = true
-       assert.Equal(t, parent.IsQCPreemptionRunning(), true)
-       assert.Equal(t, root.IsQCPreemptionRunning(), true)
-       assert.Equal(t, leaf111.IsQCPreemptionRunning(), true)
-       assert.Equal(t, leaf11.IsQCPreemptionRunning(), true)
-       assert.Equal(t, leaf.IsQCPreemptionRunning(), true)
-       assert.Equal(t, leaf2.IsQCPreemptionRunning(), false)
-
-       // reset
-       leaf111.isQuotaChangePreemptionRunning = false
-
-       // root.parent.leaf2 (leaf queue) is running. any queue located in this 
hierarchy (upwards) should return true. Other branches of parent should return 
false.
-       leaf2.isQuotaChangePreemptionRunning = true
-       assert.Equal(t, parent.IsQCPreemptionRunning(), true)
-       assert.Equal(t, root.IsQCPreemptionRunning(), true)
-       assert.Equal(t, leaf111.IsQCPreemptionRunning(), false)
-       assert.Equal(t, leaf11.IsQCPreemptionRunning(), false)
-       assert.Equal(t, leaf.IsQCPreemptionRunning(), false)
-       assert.Equal(t, leaf2.IsQCPreemptionRunning(), true)
+func TestQueue_setPreemptionTime(t *testing.T) {
+       root, e := createRootQueue(nil)
+       assert.NilError(t, e, "failed to create basic root queue")
+       tests := []struct {
+               name           string
+               oldMaxResource *resources.Resource
+               maxRes         map[string]string
+               oldDelay       time.Duration
+               delay          time.Duration
+               oldStart       bool
+               timeChange     bool
+       }{
+               {"empty", nil, map[string]string{}, 0, 0, false, false},
+               {"no delays", resources.Zero, map[string]string{"test": "100"}, 
0, 0, false, false},
+               {"max removed", 
resources.NewResourceFromMap(map[string]resources.Quantity{"test": 100}), nil, 
10, 10, false, false},
+               {"max removed update", 
resources.NewResourceFromMap(map[string]resources.Quantity{"test": 100}), nil, 
0, 10, true, true},
+               {"delay added", 
resources.NewResourceFromMap(map[string]resources.Quantity{"test": 100}), 
map[string]string{"test": "100"}, 0, 10, false, true},
+               {"delay change set start", 
resources.NewResourceFromMap(map[string]resources.Quantity{"test": 100}), 
map[string]string{"test": "100"}, 5, 10, true, true},
+               {"delay change no start", 
resources.NewResourceFromMap(map[string]resources.Quantity{"test": 100}), 
map[string]string{"test": "100"}, 5, 10, false, false},
+               {"max increase", 
resources.NewResourceFromMap(map[string]resources.Quantity{"test": 10}), 
map[string]string{"test": "100"}, 10, 10, false, false},
+               {"max lowered", 
resources.NewResourceFromMap(map[string]resources.Quantity{"test": 100}), 
map[string]string{"test": "10"}, 10, 10, false, true},
+               {"max lowered 2nd", 
resources.NewResourceFromMap(map[string]resources.Quantity{"test": 100}), 
map[string]string{"test": "10"}, 10, 10, true, false},
+               {"delay change max lowered 2nd", 
resources.NewResourceFromMap(map[string]resources.Quantity{"test": 100}), 
map[string]string{"test": "10"}, 5, 10, true, true},
+       }
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       queue, err := createManagedQueue(root, "test", false, 
tt.maxRes)
+                       assert.NilError(t, err, "queue creation failed 
unexpectedly")
+                       queue.quotaPreemptionDelay = tt.delay
+                       if tt.oldStart {
+                               queue.quotaPreemptionStartTime = time.Now()
+                       }
+                       before := queue.quotaPreemptionStartTime
+                       queue.setPreemptionTime(tt.oldMaxResource, tt.oldDelay)
+                       after := queue.quotaPreemptionStartTime
+                       assert.Equal(t, tt.timeChange, before != after, "time 
change not as expected")
+               })
+       }
 }
diff --git a/pkg/scheduler/objects/quota_change_preemptor.go 
b/pkg/scheduler/objects/quota_preemptor.go
similarity index 56%
rename from pkg/scheduler/objects/quota_change_preemptor.go
rename to pkg/scheduler/objects/quota_preemptor.go
index c302dfb3..17a751a7 100644
--- a/pkg/scheduler/objects/quota_change_preemptor.go
+++ b/pkg/scheduler/objects/quota_preemptor.go
@@ -28,21 +28,23 @@ import (
        "github.com/apache/yunikorn-scheduler-interface/lib/go/si"
 )
 
-type QuotaChangePreemptionContext struct {
+type QuotaPreemptionContext struct {
        queue               *Queue
        maxResource         *resources.Resource
        guaranteedResource  *resources.Resource
        allocatedResource   *resources.Resource
+       preemptingResource  *resources.Resource
        preemptableResource *resources.Resource
        allocations         []*Allocation
 }
 
-func NewQuotaChangePreemptor(queue *Queue) *QuotaChangePreemptionContext {
-       preemptor := &QuotaChangePreemptionContext{
+func NewQuotaPreemptor(queue *Queue) *QuotaPreemptionContext {
+       preemptor := &QuotaPreemptionContext{
                queue:               queue,
-               maxResource:         queue.CloneMaxResource(),
-               guaranteedResource:  queue.GetGuaranteedResource().Clone(),
-               allocatedResource:   queue.GetAllocatedResource().Clone(),
+               maxResource:         queue.cloneMaxResource(),
+               guaranteedResource:  queue.GetGuaranteedResource(),
+               allocatedResource:   queue.GetAllocatedResource(),
+               preemptingResource:  queue.GetPreemptingResource(),
                preemptableResource: nil,
                allocations:         make([]*Allocation, 0),
        }
@@ -50,72 +52,57 @@ func NewQuotaChangePreemptor(queue *Queue) 
*QuotaChangePreemptionContext {
        return preemptor
 }
 
-func (qcp *QuotaChangePreemptionContext) CheckPreconditions() bool {
-       if !qcp.queue.IsManaged() || qcp.queue.IsQCPreemptionRunning() {
-               return false
-       }
-       if 
qcp.maxResource.StrictlyGreaterThanOrEqualsOnlyExisting(qcp.queue.GetAllocatedResource())
 {
-               return false
+func (qpc *QuotaPreemptionContext) tryPreemption() {
+       // Get Preemptable Resource
+       qpc.setPreemptableResources()
+
+       if qpc.queue.IsLeafQueue() {
+               qpc.tryPreemptionInternal()
+               return
        }
-       return true
-}
+       leafQueues := make(map[*Queue]*resources.Resource)
+       getChildQueuesPreemptableResource(qpc.queue, qpc.preemptableResource, 
leafQueues)
 
-func (qcp *QuotaChangePreemptionContext) tryPreemption() {
-       // Get Preemptable Resource
-       preemptableResource := qcp.getPreemptableResources()
-
-       if !qcp.queue.IsLeafQueue() {
-               leafQueues := make(map[*Queue]*resources.Resource)
-               getChildQueuesPreemptableResource(qcp.queue, 
preemptableResource, leafQueues)
-
-               log.Log(log.SchedQuotaChangePreemption).Info("Triggering quota 
change preemption for parent queue",
-                       zap.String("parent queue", qcp.queue.GetQueuePath()),
-                       zap.String("preemptable resource", 
preemptableResource.String()),
-                       zap.Any("no. of leaf queues with potential victims", 
len(leafQueues)),
-               )
-
-               for leaf, leafPreemptableResource := range leafQueues {
-                       leafQueueQCPC := NewQuotaChangePreemptor(leaf)
-                       
log.Log(log.SchedQuotaChangePreemption).Info("Triggering quota change 
preemption for leaf queue",
-                               zap.String("leaf queue", leaf.GetQueuePath()),
-                               zap.String("max resource", 
leafQueueQCPC.maxResource.String()),
-                               zap.String("guaranteed resource", 
leafQueueQCPC.guaranteedResource.String()),
-                               zap.String("actual allocated resource", 
leafQueueQCPC.allocatedResource.String()),
-                               zap.String("preemptable resource distribution", 
leafPreemptableResource.String()),
-                       )
-                       
leafQueueQCPC.tryPreemptionInternal(leafPreemptableResource)
-               }
-       } else {
-               qcp.tryPreemptionInternal(preemptableResource)
+       log.Log(log.SchedQuotaChangePreemption).Info("Triggering quota change 
preemption for parent queue",
+               zap.String("parent queue", qpc.queue.GetQueuePath()),
+               zap.Stringer("preemptable resource", qpc.preemptableResource),
+               zap.Int("no. of leaf queues with potential victims", 
len(leafQueues)),
+       )
+
+       for leaf, leafPreemptableResource := range leafQueues {
+               leafQueueQCPC := NewQuotaPreemptor(leaf)
+               leafQueueQCPC.preemptableResource = leafPreemptableResource
+               leafQueueQCPC.tryPreemptionInternal()
        }
 }
 
-func (qcp *QuotaChangePreemptionContext) 
tryPreemptionInternal(preemptableResource *resources.Resource) {
+func (qpc *QuotaPreemptionContext) tryPreemptionInternal() {
+       log.Log(log.SchedQuotaChangePreemption).Info("Triggering quota change 
preemption for leaf queue",
+               zap.String("leaf queue", qpc.queue.GetQueuePath()),
+               zap.Stringer("max resource", qpc.maxResource),
+               zap.Stringer("guaranteed resource", qpc.guaranteedResource),
+               zap.Stringer("actual allocated resource", 
qpc.allocatedResource),
+               zap.Stringer("preemptable resource distribution", 
qpc.preemptableResource),
+       )
        // quota change preemption has started, so mark the flag
-       qcp.queue.MarkQuotaChangePreemptionRunning(true)
-
-       qcp.preemptableResource = preemptableResource
+       qpc.queue.setQuotaPreemptionState(true)
 
        // Filter the allocations
-       qcp.allocations = qcp.filterAllocations()
+       qpc.filterAllocations()
 
        // Sort the allocations
-       qcp.sortAllocations()
+       qpc.sortAllocations()
 
        // Preempt the victims
-       qcp.preemptVictims()
+       qpc.preemptVictims()
 
        // quota change preemption has ended, so mark the flag
-       qcp.queue.MarkQuotaChangePreemptionRunning(false)
-
-       // reset settings
-       qcp.queue.resetPreemptionSettings()
+       qpc.queue.setQuotaPreemptionState(false)
 }
 
 // getChildQueuesPreemptableResource Compute leaf queue's preemptable resource 
distribution from the parent's preemptable resource.
-// Start with immediate children of parent, compute each child distribution 
from its parent preemptable resource and repeat the same
+// Starts with immediate children of parent, compute each child distribution 
from its parent preemptable resource and repeat the same
 // for all children at all levels until end leaf queues processed recursively.
-
 // In order to achieve a fair distribution of parent's preemptable resource 
among its children,
 // Higher (relatively) the usage is, higher the preemptable resource would be 
resulted in.
 // Usage above guaranteed (if set) is only considered to derive the 
preemptable resource.
@@ -135,15 +122,17 @@ func getChildQueuesPreemptableResource(queue *Queue, 
parentPreemptableResource *
        // In case guaranteed not set, entire used resources is treated as 
preemptable resource.
        // Total preemptable resource (sum of all children's preemptable 
resources) would be calculated along the way.
        for _, child := range children {
+               allocated := child.GetAllocatedResource()
+               guaranteed := child.GetGuaranteedResource()
                // Skip child if there is no usage or usage below or equals 
guaranteed
-               if child.GetAllocatedResource().IsEmpty() || 
child.GetGuaranteedResource().StrictlyGreaterThanOrEqualsOnlyExisting(child.GetAllocatedResource())
 {
+               if allocated.IsEmpty() || 
guaranteed.StrictlyGreaterThanOrEqualsOnlyExisting(allocated) {
                        continue
                }
                var usedResource *resources.Resource
-               if !child.GetGuaranteedResource().IsEmpty() {
-                       usedResource = 
resources.SubOnlyExisting(child.GetGuaranteedResource(), 
child.GetAllocatedResource())
+               if !guaranteed.IsEmpty() {
+                       usedResource = resources.SubOnlyExisting(guaranteed, 
allocated)
                } else {
-                       usedResource = child.GetAllocatedResource()
+                       usedResource = allocated
                }
                preemptableResource := resources.NewResource()
                for k, v := range usedResource.Resources {
@@ -177,16 +166,15 @@ func getChildQueuesPreemptableResource(queue *Queue, 
parentPreemptableResource *
        }
 }
 
-// getPreemptableResources Get the preemptable resources for the queue
+// setPreemptableResources Get the preemptable resources for the queue
 // Subtracting the usage from the max resource gives the preemptable resources.
 // It could contain both positive and negative values. Only negative values 
are preemptable.
-func (qcp *QuotaChangePreemptionContext) getPreemptableResources() 
*resources.Resource {
-       maxRes := qcp.queue.CloneMaxResource()
-       used := resources.SubOnlyExisting(qcp.allocatedResource, 
qcp.queue.GetPreemptingResource())
-       if maxRes.IsEmpty() || used.IsEmpty() {
-               return nil
+func (qpc *QuotaPreemptionContext) setPreemptableResources() {
+       used := resources.SubOnlyExisting(qpc.allocatedResource, 
qpc.preemptingResource)
+       if qpc.maxResource.IsEmpty() || used.IsEmpty() {
+               return
        }
-       actual := resources.SubOnlyExisting(maxRes, used)
+       actual := resources.SubOnlyExisting(qpc.maxResource, used)
        preemptableResource := resources.NewResource()
        // Keep only the resource type which needs to be preempted
        for k, v := range actual.Resources {
@@ -195,25 +183,25 @@ func (qcp *QuotaChangePreemptionContext) 
getPreemptableResources() *resources.Re
                }
        }
        if preemptableResource.IsEmpty() {
-               return nil
+               return
        }
-       return preemptableResource
+       qpc.preemptableResource = preemptableResource
 }
 
 // filterAllocations Filter the allocations running in the queue suitable for 
choosing as victims
-func (qcp *QuotaChangePreemptionContext) filterAllocations() []*Allocation {
-       if resources.IsZero(qcp.preemptableResource) {
-               return nil
+func (qpc *QuotaPreemptionContext) filterAllocations() {
+       if resources.IsZero(qpc.preemptableResource) {
+               return
        }
-       var allocations []*Allocation
-       apps := qcp.queue.GetCopyOfApps()
+       qpc.allocations = make([]*Allocation, 0, 5)
+       apps := qpc.queue.GetCopyOfApps()
 
        // Traverse allocations running in the queue
        for _, app := range apps {
                appAllocations := app.GetAllAllocations()
                for _, alloc := range appAllocations {
                        // at least one of a preemptable resource type should 
match with a potential victim
-                       if 
!qcp.preemptableResource.MatchAny(alloc.GetAllocatedResource()) {
+                       if 
!qpc.preemptableResource.MatchAny(alloc.GetAllocatedResource()) {
                                continue
                        }
 
@@ -231,20 +219,19 @@ func (qcp *QuotaChangePreemptionContext) 
filterAllocations() []*Allocation {
                        if alloc.IsPreempted() {
                                continue
                        }
-                       allocations = append(allocations, alloc)
+                       qpc.allocations = append(qpc.allocations, alloc)
                }
        }
        log.Log(log.SchedQuotaChangePreemption).Info("Filtering allocations",
-               zap.String("queue", qcp.queue.GetQueuePath()),
-               zap.Int("filtered allocations", len(allocations)),
+               zap.String("queue", qpc.queue.GetQueuePath()),
+               zap.Int("filtered allocations", len(qpc.allocations)),
        )
-       return allocations
 }
 
 // sortAllocations Sort the allocations running in the queue
-func (qcp *QuotaChangePreemptionContext) sortAllocations() {
-       if len(qcp.allocations) > 0 {
-               SortAllocations(qcp.allocations)
+func (qpc *QuotaPreemptionContext) sortAllocations() {
+       if len(qpc.allocations) > 0 {
+               SortAllocations(qpc.allocations)
        }
 }
 
@@ -252,51 +239,52 @@ func (qcp *QuotaChangePreemptionContext) 
sortAllocations() {
 // When both max and guaranteed resources are set and equal, to comply with 
law of preemption "Ensure usage doesn't go below guaranteed resources",
 // preempt victims on best effort basis. So, preempt victims as close as 
possible to the required resource.
 // Otherwise, exceeding above the required resources slightly is acceptable 
for now.
-func (qcp *QuotaChangePreemptionContext) preemptVictims() {
-       if len(qcp.allocations) == 0 {
+func (qpc *QuotaPreemptionContext) preemptVictims() {
+       if len(qpc.allocations) == 0 {
                log.Log(log.SchedQuotaChangePreemption).Warn("BUG: No victims 
to enforce quota change through preemption",
-                       zap.String("queue", qcp.queue.GetQueuePath()))
+                       zap.String("queue", qpc.queue.GetQueuePath()))
                return
        }
        apps := make(map[*Application][]*Allocation)
        victimsTotalResource := resources.NewResource()
        log.Log(log.SchedQuotaChangePreemption).Info("Found victims for quota 
change preemption",
-               zap.String("queue", qcp.queue.GetQueuePath()),
-               zap.Int("total victims", len(qcp.allocations)),
-               zap.String("max resources", qcp.maxResource.String()),
-               zap.String("guaranteed resources", 
qcp.guaranteedResource.String()),
-               zap.String("allocated resources", 
qcp.allocatedResource.String()),
-               zap.String("preemptable resources", 
qcp.preemptableResource.String()),
-               zap.Bool("isGuaranteedSet", qcp.guaranteedResource.IsEmpty()),
+               zap.String("queue", qpc.queue.GetQueuePath()),
+               zap.Int("total victims", len(qpc.allocations)),
+               zap.Stringer("max resources", qpc.maxResource),
+               zap.Stringer("guaranteed resources", qpc.guaranteedResource),
+               zap.Stringer("allocated resources", qpc.allocatedResource),
+               zap.Stringer("preemptable resources", qpc.preemptableResource),
+               zap.Bool("isGuaranteedSet", qpc.guaranteedResource.IsEmpty()),
        )
-       for _, victim := range qcp.allocations {
-               if 
!qcp.preemptableResource.FitInMaxUndef(victim.GetAllocatedResource()) {
+       for _, victim := range qpc.allocations {
+               victimAlloc := victim.GetAllocatedResource()
+               if !qpc.preemptableResource.FitInMaxUndef(victimAlloc) {
                        continue
                }
-               application := qcp.queue.GetApplication(victim.applicationID)
+               application := qpc.queue.GetApplication(victim.applicationID)
                if application == nil {
                        log.Log(log.SchedQuotaChangePreemption).Warn("BUG: 
application not found in queue",
-                               zap.String("queue", qcp.queue.GetQueuePath()),
+                               zap.String("queue", qpc.queue.GetQueuePath()),
                                zap.String("application", victim.applicationID))
                        continue
                }
 
                // Keep collecting the victims until preemptable resource 
reaches and subtract the usage
-               if 
qcp.preemptableResource.StrictlyGreaterThanOnlyExisting(victimsTotalResource) {
+               if 
qpc.preemptableResource.StrictlyGreaterThanOnlyExisting(victimsTotalResource) {
                        apps[application] = append(apps[application], victim)
-                       
qcp.allocatedResource.SubFrom(victim.GetAllocatedResource())
+                       qpc.allocatedResource.SubFrom(victimAlloc)
                }
 
                // Has usage gone below the guaranteed resources?
                // If yes, revert the recently added victim steps completely 
and try next victim.
-               if !qcp.guaranteedResource.IsEmpty() && 
qcp.guaranteedResource.StrictlyGreaterThanOnlyExisting(qcp.allocatedResource) {
+               if !qpc.guaranteedResource.IsEmpty() && 
qpc.guaranteedResource.StrictlyGreaterThanOnlyExisting(qpc.allocatedResource) {
                        victims := apps[application]
                        exceptRecentlyAddedVictims := victims[:len(victims)-1]
                        apps[application] = exceptRecentlyAddedVictims
-                       
qcp.allocatedResource.AddTo(victim.GetAllocatedResource())
-                       
victimsTotalResource.SubFrom(victim.GetAllocatedResource())
+                       qpc.allocatedResource.AddTo(victimAlloc)
+                       victimsTotalResource.SubFrom(victimAlloc)
                } else {
-                       
victimsTotalResource.AddTo(victim.GetAllocatedResource())
+                       victimsTotalResource.AddTo(victimAlloc)
                }
        }
 
@@ -305,28 +293,22 @@ func (qcp *QuotaChangePreemptionContext) preemptVictims() 
{
                        for _, victim := range victims {
                                err := victim.MarkPreempted()
                                if err != nil {
-                                       
log.Log(log.SchedRequiredNodePreemption).Warn("allocation is already released, 
so not proceeding further on the daemon set preemption process",
-                                               zap.String("applicationID", 
victim.applicationID),
+                                       
log.Log(log.SchedRequiredNodePreemption).Warn("allocation is already released, 
ignoring in quota preemption process",
+                                               zap.String("applicationID", 
victim.GetApplicationID()),
                                                zap.String("allocationKey", 
victim.GetAllocationKey()))
                                        continue
                                }
-                               
log.Log(log.SchedQuotaChangePreemption).Info("Preempting victims for quota 
change preemption",
-                                       zap.String("queue", 
qcp.queue.GetQueuePath()),
-                                       zap.String("victim allocation key", 
victim.allocationKey),
-                                       zap.String("victim allocated 
resources", victim.GetAllocatedResource().String()),
-                                       zap.String("victim application", 
victim.applicationID),
-                                       zap.String("victim node", 
victim.GetNodeID()),
-                               )
-                               
qcp.queue.IncPreemptingResource(victim.GetAllocatedResource())
-                               
victim.SendPreemptedByQuotaChangeEvent(qcp.queue.GetQueuePath())
+                               
log.Log(log.SchedQuotaChangePreemption).Info("Preempting victim for quota 
change preemption",
+                                       zap.String("queue", 
qpc.queue.GetQueuePath()),
+                                       zap.String("allocationKey", 
victim.GetAllocationKey()),
+                                       zap.Stringer("allocatedResources", 
victim.GetAllocatedResource()),
+                                       zap.String("applicationID", 
victim.GetApplicationID()),
+                                       zap.String("nodeID", 
victim.GetNodeID()))
+                               
qpc.queue.IncPreemptingResource(victim.GetAllocatedResource())
+                               
victim.SendPreemptedByQuotaChangeEvent(qpc.queue.GetQueuePath())
                        }
                        app.notifyRMAllocationReleased(victims, 
si.TerminationType_PREEMPTED_BY_SCHEDULER,
-                               "preempting allocations to enforce new max 
quota for queue : "+qcp.queue.GetQueuePath())
+                               "preempting allocations to enforce new max 
quota for queue : "+qpc.queue.GetQueuePath())
                }
        }
 }
-
-// only for testing
-func (qcp *QuotaChangePreemptionContext) getVictims() []*Allocation {
-       return qcp.allocations
-}
diff --git a/pkg/scheduler/objects/quota_change_preemptor_test.go 
b/pkg/scheduler/objects/quota_preemptor_test.go
similarity index 89%
rename from pkg/scheduler/objects/quota_change_preemptor_test.go
rename to pkg/scheduler/objects/quota_preemptor_test.go
index d578122a..04b923ec 100644
--- a/pkg/scheduler/objects/quota_change_preemptor_test.go
+++ b/pkg/scheduler/objects/quota_preemptor_test.go
@@ -20,7 +20,6 @@ package objects
 
 import (
        "testing"
-
        "time"
 
        "gotest.tools/v3/assert"
@@ -30,88 +29,6 @@ import (
        "github.com/apache/yunikorn-scheduler-interface/lib/go/si"
 )
 
-func TestQuotaChangeCheckPreconditions(t *testing.T) {
-       parentConfig := configs.QueueConfig{
-               Name:   "parent",
-               Parent: true,
-               Resources: configs.Resources{
-                       Max: map[string]string{"memory": "1000"},
-               },
-       }
-       parent, err := NewConfiguredQueue(parentConfig, nil, false, nil)
-       assert.NilError(t, err)
-       parent.allocatedResource = 
resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 2000, 
"cpu": 2000})
-
-       leafRes := configs.Resources{
-               Max: map[string]string{"memory": "1000"},
-       }
-       leaf, err := NewConfiguredQueue(configs.QueueConfig{
-               Name:      "leaf",
-               Resources: leafRes,
-       }, parent, false, nil)
-       assert.NilError(t, err)
-
-       dynamicLeaf, err := NewConfiguredQueue(configs.QueueConfig{
-               Name:      "dynamic-leaf",
-               Resources: leafRes,
-       }, parent, false, nil)
-       assert.NilError(t, err)
-       dynamicLeaf.isManaged = false
-
-       alreadyPreemptionRunning, err := NewConfiguredQueue(configs.QueueConfig{
-               Name:      "leaf-already-preemption-running",
-               Resources: leafRes,
-       }, parent, false, nil)
-       assert.NilError(t, err)
-
-       usageExceededMaxQueue, err := NewConfiguredQueue(configs.QueueConfig{
-               Name:      "leaf-usage-exceeded-max",
-               Resources: leafRes,
-       }, parent, false, nil)
-       assert.NilError(t, err)
-       usageExceededMaxQueue.allocatedResource = 
resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 2000, 
"cpu": 2000})
-
-       usageEqualsMaxQueue, err := NewConfiguredQueue(configs.QueueConfig{
-               Name:      "leaf-usage-equals-max",
-               Resources: leafRes,
-       }, parent, false, nil)
-       assert.NilError(t, err)
-       usageEqualsMaxQueue.allocatedResource = 
resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 1000, 
"cpu": 1000})
-
-       usageNotMatchingMaxQueue, err := NewConfiguredQueue(configs.QueueConfig{
-               Name: "leaf-usage-res-not-matching-max-res",
-               Resources: configs.Resources{
-                       Max: map[string]string{"cpu": "1000"},
-               },
-       }, parent, false, nil)
-       assert.NilError(t, err)
-       usageNotMatchingMaxQueue.allocatedResource = 
resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 1000})
-
-       testCases := []struct {
-               name               string
-               queue              *Queue
-               preemptionRunning  bool
-               preconditionResult bool
-       }{
-               {"parent queue", parent, false, true},
-               {"leaf queue", leaf, false, false},
-               {"dynamic leaf queue", dynamicLeaf, false, false},
-               {"leaf queue, usage exceeded max resources", 
usageExceededMaxQueue, false, true},
-               {"leaf queue, usage equals max resources", usageEqualsMaxQueue, 
false, false},
-               {"leaf queue, already preemption process started or running", 
alreadyPreemptionRunning, true, false},
-       }
-       for _, tc := range testCases {
-               t.Run(tc.name, func(t *testing.T) {
-                       
tc.queue.MarkQuotaChangePreemptionRunning(tc.preemptionRunning)
-                       preemptor := NewQuotaChangePreemptor(tc.queue)
-                       assert.Equal(t, preemptor.CheckPreconditions(), 
tc.preconditionResult)
-               })
-       }
-       // Since parent's leaf queue "leaf-already-preemption-running" is 
running, parent preconditions passed earlier should fail now
-       preemptor := NewQuotaChangePreemptor(parent)
-       assert.Equal(t, preemptor.CheckPreconditions(), false)
-}
-
 func TestQuotaChangeGetPreemptableResource(t *testing.T) {
        leaf, err := NewConfiguredQueue(configs.QueueConfig{
                Name: "leaf",
@@ -137,8 +54,9 @@ func TestQuotaChangeGetPreemptableResource(t *testing.T) {
                t.Run(tc.name, func(t *testing.T) {
                        tc.queue.maxResource = tc.maxResource
                        tc.queue.allocatedResource = tc.usedResource
-                       preemptor := NewQuotaChangePreemptor(tc.queue)
-                       assert.Equal(t, 
resources.Equals(preemptor.getPreemptableResources(), tc.preemptable), true)
+                       preemptor := NewQuotaPreemptor(tc.queue)
+                       preemptor.setPreemptableResources()
+                       assert.Equal(t, 
resources.Equals(preemptor.preemptableResource, tc.preemptable), true)
                })
        }
 }
@@ -191,10 +109,10 @@ func TestQuotaChangeFilterVictims(t *testing.T) {
                                err = asks[5].SetReleased(true)
                                assert.NilError(t, err)
                        }
-                       preemptor := NewQuotaChangePreemptor(tc.queue)
+                       preemptor := NewQuotaPreemptor(tc.queue)
                        preemptor.preemptableResource = tc.preemptableResource
-                       allocations := preemptor.filterAllocations()
-                       assert.Equal(t, len(allocations), 
tc.expectedAllocationsCount)
+                       preemptor.filterAllocations()
+                       assert.Equal(t, len(preemptor.allocations), 
tc.expectedAllocationsCount)
                        removeAllocationAsks(node, asks)
                        resetQueue(leaf)
                })
@@ -267,10 +185,10 @@ func TestQuotaChangeTryPreemption(t *testing.T) {
                        assignAllocationsToQueue(asks, leaf)
                        leaf.maxResource = tc.newMax
                        leaf.guaranteedResource = tc.guaranteed
-                       preemptor := NewQuotaChangePreemptor(tc.queue)
+                       preemptor := NewQuotaPreemptor(tc.queue)
                        preemptor.allocations = asks
                        preemptor.tryPreemption()
-                       assert.Equal(t, len(preemptor.getVictims()), 
tc.totalExpectedVictims)
+                       assert.Equal(t, len(preemptor.allocations), 
tc.totalExpectedVictims)
                        var victimsCount int
                        for _, a := range asks {
                                if a.IsPreempted() {
@@ -379,9 +297,9 @@ func TestQuotaChangeTryPreemptionWithDifferentResTypes(t 
*testing.T) {
                                assignAllocationsToQueue(asks, leaf)
                                leaf.maxResource = tc.newMax
                                leaf.guaranteedResource = tc.guaranteed
-                               preemptor := NewQuotaChangePreemptor(tc.queue)
+                               preemptor := NewQuotaPreemptor(tc.queue)
                                preemptor.tryPreemption()
-                               assert.Equal(t, len(preemptor.getVictims()), 
v.totalExpectedVictims)
+                               assert.Equal(t, len(preemptor.allocations), 
v.totalExpectedVictims)
                                var victimsCount int
                                for _, a := range asks {
                                        if a.IsPreempted() {
@@ -628,7 +546,7 @@ func TestQuotaChangeTryPreemptionForParentQueue(t 
*testing.T) {
                                assignAllocationsToQueue(v, q)
                        }
                        tc.queue.maxResource = tc.newMax
-                       preemptor := NewQuotaChangePreemptor(tc.queue)
+                       preemptor := NewQuotaPreemptor(tc.queue)
                        preemptor.tryPreemption()
                        for q, asks := range tc.victims {
                                var victimsCount int
diff --git a/pkg/scheduler/partition.go b/pkg/scheduler/partition.go
index 870f46cb..82c398e0 100644
--- a/pkg/scheduler/partition.go
+++ b/pkg/scheduler/partition.go
@@ -65,6 +65,7 @@ type PartitionContext struct {
        reservations           int                             // number of 
reservations
        placeholderAllocations int                             // number of 
placeholder allocations
        preemptionEnabled      bool                            // whether 
preemption is enabled or not
+       quotaPreemptionEnabled bool                            // whether quota 
preemption is enabled or not
        foreignAllocs          map[string]*objects.Allocation  // foreign 
(non-Yunikorn) allocations
        appQueueMapping        *objects.AppQueueMapping        // appID mapping 
to queues
 
@@ -166,6 +167,7 @@ func (pc *PartitionContext) updateNodeSortingPolicy(conf 
configs.PartitionConfig
 // NOTE: this is a lock free call. It should only be called holding the 
PartitionContext lock.
 func (pc *PartitionContext) updatePreemption(conf configs.PartitionConfig) {
        pc.preemptionEnabled = conf.Preemption.Enabled == nil || 
*conf.Preemption.Enabled
+       pc.quotaPreemptionEnabled = conf.Preemption.QuotaPreemptionEnabled != 
nil && *conf.Preemption.QuotaPreemptionEnabled
 }
 
 func (pc *PartitionContext) updatePartitionDetails(conf 
configs.PartitionConfig) error {
@@ -189,12 +191,12 @@ func (pc *PartitionContext) updatePartitionDetails(conf 
configs.PartitionConfig)
        queueConf := conf.Queues[0]
        root := pc.root
        // update the root queue
-       if err := root.ApplyConf(queueConf); err != nil {
+       if _, err = root.ApplyConf(queueConf); err != nil {
                return err
        }
-       root.UpdateQueueProperties()
+       root.UpdateQueueProperties(nil)
        // update the rest of the queues recursively
-       if err := pc.updateQueues(queueConf.Queues, root); err != nil {
+       if err = pc.updateQueues(queueConf.Queues, root); err != nil {
                return err
        }
        // update limit settings: start at the root
@@ -234,16 +236,17 @@ func (pc *PartitionContext) updateQueues(config 
[]configs.QueueConfig, parent *o
                pathName := parentPath + queueConfig.Name
                queue := pc.getQueueInternal(pathName)
                var err error
+               var oldMax *resources.Resource
                if queue == nil {
                        queue, err = objects.NewConfiguredQueue(queueConfig, 
parent, false, pc.appQueueMapping)
                } else {
-                       err = queue.ApplyConf(queueConfig)
+                       oldMax, err = queue.ApplyConf(queueConfig)
                }
                if err != nil {
                        return err
                }
                // special call to convert to a real policy from the property
-               queue.UpdateQueueProperties()
+               queue.UpdateQueueProperties(oldMax)
                if err = pc.updateQueues(queueConfig.Queues, queue); err != nil 
{
                        return err
                }
@@ -816,7 +819,7 @@ func (pc *PartitionContext) tryAllocate() 
*objects.AllocationResult {
                return nil
        }
        // try allocating from the root down
-       result := pc.root.TryAllocate(pc.GetNodeIterator, 
pc.GetFullNodeIterator, pc.GetNode, pc.IsPreemptionEnabled())
+       result := pc.root.TryAllocate(pc.GetNodeIterator, 
pc.GetFullNodeIterator, pc.GetNode, pc.IsPreemptionEnabled(), 
pc.IsQuotaPreemptionEnabled())
        if result != nil {
                return pc.allocate(result)
        }
@@ -1645,6 +1648,12 @@ func (pc *PartitionContext) IsPreemptionEnabled() bool {
        return pc.preemptionEnabled
 }
 
+func (pc *PartitionContext) IsQuotaPreemptionEnabled() bool {
+       pc.RLock()
+       defer pc.RUnlock()
+       return pc.quotaPreemptionEnabled
+}
+
 func (pc *PartitionContext) moveTerminatedApp(appID string) {
        app := pc.getApplication(appID)
        // nothing to do if the app is not found on the partition
diff --git a/pkg/scheduler/partition_test.go b/pkg/scheduler/partition_test.go
index ebca07f5..e62e501a 100644
--- a/pkg/scheduler/partition_test.go
+++ b/pkg/scheduler/partition_test.go
@@ -1050,14 +1050,14 @@ func TestAddAppTaskGroup(t *testing.T) {
 
        // queue now has fair as sort policy app add should fail
        queue := partition.GetQueue(defQueue)
-       err = queue.ApplyConf(configs.QueueConfig{
+       _, err = queue.ApplyConf(configs.QueueConfig{
                Name:       "default",
                Parent:     false,
                Queues:     nil,
                Properties: map[string]string{configs.ApplicationSortPolicy: 
"fair"},
        })
        assert.NilError(t, err, "updating queue should not have failed")
-       queue.UpdateQueueProperties()
+       queue.UpdateQueueProperties(nil)
        err = partition.AddApplication(app)
        if err == nil || partition.getApplication(appID2) != nil {
                t.Errorf("add application should have failed due to queue sort 
policy but did not")
@@ -3775,22 +3775,28 @@ func TestUpdatePreemption(t *testing.T) {
 
        partition, err := newBasePartition()
        assert.NilError(t, err, "Partition creation failed")
-       assert.Assert(t, partition.IsPreemptionEnabled(), "preeemption should 
be enabled by default")
+       assert.Assert(t, partition.IsPreemptionEnabled(), "preemption should be 
enabled by default")
+       assert.Assert(t, !partition.IsQuotaPreemptionEnabled(), "quota 
preemption should be disabled by default")
 
        partition.updatePreemption(configs.PartitionConfig{})
-       assert.Assert(t, partition.IsPreemptionEnabled(), "preeemption should 
be enabled by empty config")
+       assert.Assert(t, partition.IsPreemptionEnabled(), "preemption should be 
enabled by empty config")
+       assert.Assert(t, !partition.IsQuotaPreemptionEnabled(), "quota 
preemption should be disabled by default")
 
        partition.updatePreemption(configs.PartitionConfig{Preemption: 
configs.PartitionPreemptionConfig{}})
-       assert.Assert(t, partition.IsPreemptionEnabled(), "preeemption should 
be enabled by empty preemption section")
+       assert.Assert(t, partition.IsPreemptionEnabled(), "preemption should be 
enabled by empty preemption section")
+       assert.Assert(t, !partition.IsQuotaPreemptionEnabled(), "quota 
preemption should be disabled by empty preemption section")
 
-       partition.updatePreemption(configs.PartitionConfig{Preemption: 
configs.PartitionPreemptionConfig{Enabled: nil}})
-       assert.Assert(t, partition.IsPreemptionEnabled(), "preeemption should 
be enabled by explicit nil")
+       partition.updatePreemption(configs.PartitionConfig{Preemption: 
configs.PartitionPreemptionConfig{Enabled: nil, QuotaPreemptionEnabled: nil}})
+       assert.Assert(t, partition.IsPreemptionEnabled(), "preemption should be 
enabled by explicit nil")
+       assert.Assert(t, !partition.IsQuotaPreemptionEnabled(), "quota 
preemption should be disabled by explicit nil")
 
-       partition.updatePreemption(configs.PartitionConfig{Preemption: 
configs.PartitionPreemptionConfig{Enabled: &True}})
-       assert.Assert(t, partition.IsPreemptionEnabled(), "preeemption should 
be enabled by explicit true")
+       partition.updatePreemption(configs.PartitionConfig{Preemption: 
configs.PartitionPreemptionConfig{Enabled: &True, QuotaPreemptionEnabled: 
&True}})
+       assert.Assert(t, partition.IsPreemptionEnabled(), "preemption should be 
enabled by explicit true")
+       assert.Assert(t, partition.IsQuotaPreemptionEnabled(), "quota 
preemption should be enabled by explicit true")
 
-       partition.updatePreemption(configs.PartitionConfig{Preemption: 
configs.PartitionPreemptionConfig{Enabled: &False}})
-       assert.Assert(t, !partition.IsPreemptionEnabled(), "preeemption should 
be disabled by explicit false")
+       partition.updatePreemption(configs.PartitionConfig{Preemption: 
configs.PartitionPreemptionConfig{Enabled: &False, QuotaPreemptionEnabled: 
&False}})
+       assert.Assert(t, !partition.IsPreemptionEnabled(), "preemption should 
be disabled by explicit false")
+       assert.Assert(t, !partition.IsQuotaPreemptionEnabled(), "quota 
preemption should be disabled by explicit false")
 }
 
 func TestUpdateNodeSortingPolicy(t *testing.T) {
diff --git a/pkg/webservice/dao/partition_info.go 
b/pkg/webservice/dao/partition_info.go
index ed4b4b10..07e27293 100644
--- a/pkg/webservice/dao/partition_info.go
+++ b/pkg/webservice/dao/partition_info.go
@@ -19,11 +19,12 @@
 package dao
 
 type PartitionInfo struct {
-       ClusterID               string            `json:"clusterId"`         // 
no omitempty, cluster id should not be empty
-       Name                    string            `json:"name"`              // 
no omitempty, name should not be empty
-       Capacity                PartitionCapacity `json:"capacity"`          // 
no omitempty, omitempty doesn't work on a structure value
-       NodeSortingPolicy       NodeSortingPolicy `json:"nodeSortingPolicy"` // 
no omitempty, omitempty doesn't work on a structure value
-       PreemptionEnabled       bool              `json:"preemptionEnabled"` // 
no omitempty, false shows preemption status better
+       ClusterID               string            `json:"clusterId"`            
  // no omitempty, cluster id should not be empty
+       Name                    string            `json:"name"`                 
  // no omitempty, name should not be empty
+       Capacity                PartitionCapacity `json:"capacity"`             
  // no omitempty, omitempty doesn't work on a structure value
+       NodeSortingPolicy       NodeSortingPolicy `json:"nodeSortingPolicy"`    
  // no omitempty, omitempty doesn't work on a structure value
+       PreemptionEnabled       bool              `json:"preemptionEnabled"`    
  // no omitempty, false shows preemption status better
+       QuotaPreemptionEnabled  bool              
`json:"quotaPreemptionEnabled"` // no omitempty, false shows quota preemption 
status better
        TotalNodes              int               `json:"totalNodes,omitempty"`
        Applications            map[string]int    
`json:"applications,omitempty"`
        TotalContainers         int               
`json:"totalContainers,omitempty"`
diff --git a/pkg/webservice/dao/queue_info.go b/pkg/webservice/dao/queue_info.go
index 7abe6f49..607d366a 100644
--- a/pkg/webservice/dao/queue_info.go
+++ b/pkg/webservice/dao/queue_info.go
@@ -52,6 +52,7 @@ type PartitionQueueDAOInfo struct {
        PreemptionEnabled      bool                    
`json:"preemptionEnabled"` // no omitempty, false shows preemption status better
        IsPreemptionFence      bool                    
`json:"isPreemptionFence"` // no omitempty, a false value gives a quick way to 
understand whether it's fenced.
        PreemptionDelay        string                  
`json:"preemptionDelay,omitempty"`
+       QuotaPreemptionDelay   string                  
`json:"quotaPreemptionDelay,omitempty"`
        IsPriorityFence        bool                    `json:"isPriorityFence"` 
// no omitempty, a false value gives a quick way to understand whether it's 
fenced.
        PriorityOffset         int32                   
`json:"priorityOffset,omitempty"`
 }
diff --git a/pkg/webservice/handlers.go b/pkg/webservice/handlers.go
index a76f76c5..02532862 100644
--- a/pkg/webservice/handlers.go
+++ b/pkg/webservice/handlers.go
@@ -1018,6 +1018,7 @@ func getPartitionInfoDAO(lists 
map[string]*scheduler.PartitionContext) []*dao.Pa
                partitionInfo.State = partitionContext.GetCurrentState()
                partitionInfo.LastStateTransitionTime = 
partitionContext.GetStateTime().UnixNano()
                partitionInfo.PreemptionEnabled = 
partitionContext.IsPreemptionEnabled()
+               partitionInfo.QuotaPreemptionEnabled = 
partitionContext.IsQuotaPreemptionEnabled()
 
                capacityInfo := dao.PartitionCapacity{}
                capacity := partitionContext.GetTotalPartitionResource()


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to