This is an automated email from the ASF dual-hosted git repository.

mani pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git


The following commit(s) were added to refs/heads/master by this push:
     new 6bd254cf [YUNIKORN-3140] Quota change through preemption config 
changes (#1034)
6bd254cf is described below

commit 6bd254cf1972d2635d0087a390969454e243fd6c
Author: mani <[email protected]>
AuthorDate: Wed Nov 5 10:29:08 2025 +0530

    [YUNIKORN-3140] Quota change through preemption config changes (#1034)
    
    Closes: #1034
    
    Signed-off-by: mani <[email protected]>
---
 pkg/common/configs/config.go               |   5 ++
 pkg/common/configs/configvalidator.go      |   3 +
 pkg/common/configs/configvalidator_test.go |  28 ++++++++
 pkg/scheduler/objects/queue.go             | 100 ++++++++++++++++++++---------
 pkg/scheduler/objects/queue_test.go        |  98 +++++++++++++++++++++++++++-
 5 files changed, 202 insertions(+), 32 deletions(-)

diff --git a/pkg/common/configs/config.go b/pkg/common/configs/config.go
index 32a4ef2d..792f833b 100644
--- a/pkg/common/configs/config.go
+++ b/pkg/common/configs/config.go
@@ -84,6 +84,11 @@ type QueueConfig struct {
        ChildTemplate   ChildTemplate     `yaml:",omitempty" json:",omitempty"`
        Queues          []QueueConfig     `yaml:",omitempty" json:",omitempty"`
        Limits          []Limit           `yaml:",omitempty" json:",omitempty"`
+       Preemption      Preemption        `yaml:",omitempty" json:",omitempty"`
+}
+
+type Preemption struct {
+       Delay uint64 `yaml:",omitempty" json:",omitempty"`
 }
 
 type ChildTemplate struct {
diff --git a/pkg/common/configs/configvalidator.go 
b/pkg/common/configs/configvalidator.go
index 76b537f7..c311823b 100644
--- a/pkg/common/configs/configvalidator.go
+++ b/pkg/common/configs/configvalidator.go
@@ -662,6 +662,9 @@ func checkQueues(queue *QueueConfig, level int) error {
                        return fmt.Errorf("duplicate child name found with name 
'%s', level %d", child.Name, level)
                }
                queueMap[strings.ToLower(child.Name)] = true
+               if queue.Preemption.Delay != 0 && queue.Preemption.Delay <= 60 {
+                       return fmt.Errorf("invalid preemption delay %d, must be 
greater than 60 seconds", queue.Preemption.Delay)
+               }
        }
 
        // recurse into the depth if this level passed
diff --git a/pkg/common/configs/configvalidator_test.go 
b/pkg/common/configs/configvalidator_test.go
index 4d0c6495..e62f4b3f 100644
--- a/pkg/common/configs/configvalidator_test.go
+++ b/pkg/common/configs/configvalidator_test.go
@@ -2327,6 +2327,34 @@ func TestCheckQueues(t *testing.T) { //nolint:funlen
                                assert.Equal(t, 2, len(q.Queues), "Expected two 
queues")
                        },
                },
+               {
+                       name: "Invalid Preemption delay for leaf queue",
+                       queue: &QueueConfig{
+                               Name: "root",
+                               Queues: []QueueConfig{
+                                       {Name: "leaf",
+                                               Preemption: Preemption{
+                                                       Delay: 10,
+                                               },
+                                       },
+                               },
+                               Preemption: Preemption{
+                                       Delay: 10,
+                               },
+                       },
+                       level:            1,
+                       expectedErrorMsg: "invalid preemption delay 10, must be 
greater than 60 seconds",
+               },
+               {
+                       name: "Setting Preemption delay on root queue would be 
ignored",
+                       queue: &QueueConfig{
+                               Name: "root",
+                               Preemption: Preemption{
+                                       Delay: 10,
+                               },
+                       },
+                       level: 0,
+               },
        }
 
        for _, tc := range testCases {
diff --git a/pkg/scheduler/objects/queue.go b/pkg/scheduler/objects/queue.go
index a81b1bfa..db6b972a 100644
--- a/pkg/scheduler/objects/queue.go
+++ b/pkg/scheduler/objects/queue.go
@@ -74,21 +74,22 @@ type Queue struct {
        // The queue properties should be treated as immutable the value is a 
merge of the
        // parent properties with the config for this queue only manipulated 
during creation
        // of the queue or via a queue configuration update.
-       properties             map[string]string
-       adminACL               security.ACL        // admin ACL
-       submitACL              security.ACL        // submit ACL
-       maxResource            *resources.Resource // When not set, max = nil
-       guaranteedResource     *resources.Resource // When not set, Guaranteed 
== 0
-       isLeaf                 bool                // this is a leaf queue or 
not (i.e. parent)
-       isManaged              bool                // queue is part of the 
config, not auto created
-       stateMachine           *fsm.FSM            // the state of the queue 
for scheduling
-       stateTime              time.Time           // last time the state was 
updated (needed for cleanup)
-       maxRunningApps         uint64
-       runningApps            uint64
-       allocatingAcceptedApps map[string]bool
-       template               *template.Template
-       queueEvents            *schedEvt.QueueEvents
-       appQueueMapping        *AppQueueMapping // appID mapping to queues
+       properties                 map[string]string
+       adminACL                   security.ACL        // admin ACL
+       submitACL                  security.ACL        // submit ACL
+       maxResource                *resources.Resource // When not set, max = 
nil
+       guaranteedResource         *resources.Resource // When not set, 
Guaranteed == 0
+       isLeaf                     bool                // this is a leaf queue 
or not (i.e. parent)
+       isManaged                  bool                // queue is part of the 
config, not auto created
+       stateMachine               *fsm.FSM            // the state of the 
queue for scheduling
+       stateTime                  time.Time           // last time the state 
was updated (needed for cleanup)
+       maxRunningApps             uint64
+       runningApps                uint64
+       allocatingAcceptedApps     map[string]bool
+       template                   *template.Template
+       queueEvents                *schedEvt.QueueEvents
+       appQueueMapping            *AppQueueMapping // appID mapping to queues
+       quotaChangePreemptionDelay uint64
 
        locking.RWMutex
 }
@@ -96,21 +97,22 @@ type Queue struct {
 // newBlankQueue creates a new empty queue objects with all values initialised.
 func newBlankQueue() *Queue {
        return &Queue{
-               children:               make(map[string]*Queue),
-               childPriorities:        make(map[string]int32),
-               applications:           make(map[string]*Application),
-               appPriorities:          make(map[string]int32),
-               reservedApps:           make(map[string]int),
-               allocatingAcceptedApps: make(map[string]bool),
-               properties:             make(map[string]string),
-               stateMachine:           NewObjectState(),
-               allocatedResource:      resources.NewResource(),
-               preemptingResource:     resources.NewResource(),
-               pending:                resources.NewResource(),
-               currentPriority:        configs.MinPriority,
-               prioritySortEnabled:    true,
-               preemptionDelay:        configs.DefaultPreemptionDelay,
-               preemptionPolicy:       policies.DefaultPreemptionPolicy,
+               children:                   make(map[string]*Queue),
+               childPriorities:            make(map[string]int32),
+               applications:               make(map[string]*Application),
+               appPriorities:              make(map[string]int32),
+               reservedApps:               make(map[string]int),
+               allocatingAcceptedApps:     make(map[string]bool),
+               properties:                 make(map[string]string),
+               stateMachine:               NewObjectState(),
+               allocatedResource:          resources.NewResource(),
+               preemptingResource:         resources.NewResource(),
+               pending:                    resources.NewResource(),
+               currentPriority:            configs.MinPriority,
+               prioritySortEnabled:        true,
+               preemptionDelay:            configs.DefaultPreemptionDelay,
+               preemptionPolicy:           policies.DefaultPreemptionPolicy,
+               quotaChangePreemptionDelay: 0,
        }
 }
 
@@ -155,7 +157,6 @@ func NewConfiguredQueue(conf configs.QueueConfig, parent 
*Queue, silence bool, a
                        zap.String("queueName", sq.QueuePath))
                sq.queueEvents.SendNewQueueEvent(sq.QueuePath, sq.isManaged)
        }
-
        return sq, nil
 }
 
@@ -372,17 +373,54 @@ func (sq *Queue) applyConf(conf configs.QueueConfig, 
silence bool) error {
 
        // Load the max & guaranteed resources and maxApps for all but the root 
queue
        if sq.Name != configs.RootQueue {
+               oldMaxResource := sq.maxResource
                if err = sq.setResourcesFromConf(conf.Resources); err != nil {
                        return err
                }
                sq.maxRunningApps = conf.MaxApplications
                sq.updateMaxRunningAppsMetrics()
+               sq.setPreemptionSettings(oldMaxResource, conf)
        }
 
        sq.properties = conf.Properties
        return nil
 }
 
+// setPreemptionSettings Set Quota change preemption settings
+func (sq *Queue) setPreemptionSettings(oldMaxResource *resources.Resource, 
conf configs.QueueConfig) {
+       newMaxResource, err := resources.NewResourceFromConf(conf.Resources.Max)
+       if err != nil {
+               log.Log(log.SchedQueue).Error("parsing failed on max resources 
this should not happen",
+                       zap.String("queue", sq.QueuePath),
+                       zap.Error(err))
+               return
+       }
+
+       switch {
+       // Set max res earlier but not now
+       case resources.IsZero(newMaxResource) && 
!resources.IsZero(oldMaxResource):
+               sq.quotaChangePreemptionDelay = 0
+               // Set max res now but not earlier
+       case !resources.IsZero(newMaxResource) && 
resources.IsZero(oldMaxResource) && conf.Preemption.Delay != 0:
+               sq.quotaChangePreemptionDelay = conf.Preemption.Delay
+               // Set max res earlier and now as well
+       default:
+               switch {
+               // Quota decrease
+               case resources.StrictlyGreaterThan(oldMaxResource, 
newMaxResource) && conf.Preemption.Delay != 0:
+                       sq.quotaChangePreemptionDelay = conf.Preemption.Delay
+                       // Quota increase
+               case resources.StrictlyGreaterThan(newMaxResource, 
oldMaxResource) && conf.Preemption.Delay != 0:
+                       sq.quotaChangePreemptionDelay = 0
+                       // Quota remains as is but delay has changed
+               case resources.Equals(oldMaxResource, newMaxResource) && 
conf.Preemption.Delay != 0 && sq.quotaChangePreemptionDelay != 
conf.Preemption.Delay:
+                       sq.quotaChangePreemptionDelay = conf.Preemption.Delay
+               default:
+                       // noop
+               }
+       }
+}
+
 // setResourcesFromConf sets the maxResource and guaranteedResource of the 
queue from the config.
 func (sq *Queue) setResourcesFromConf(resource configs.Resources) error {
        maxResource, err := resources.NewResourceFromConf(resource.Max)
diff --git a/pkg/scheduler/objects/queue_test.go 
b/pkg/scheduler/objects/queue_test.go
index c6c2594e..67d5a4c2 100644
--- a/pkg/scheduler/objects/queue_test.go
+++ b/pkg/scheduler/objects/queue_test.go
@@ -2246,6 +2246,88 @@ func TestApplyConf(t *testing.T) {
        assert.Equal(t, root.maxRunningApps, uint64(0))
 }
 
+func TestQuotaChangePreemptionSettings(t *testing.T) {
+       root, err := createManagedQueueWithProps(nil, "root", true, nil, nil)
+       assert.NilError(t, err, "failed to create basic queue: %v", err)
+
+       parent, err := createManagedQueueWithProps(root, "parent", false, nil, 
nil)
+       assert.NilError(t, err, "failed to create basic queue: %v", err)
+       testCases := []struct {
+               name          string
+               conf          configs.QueueConfig
+               expectedDelay uint64
+       }{{"first time queue setup without delay", configs.QueueConfig{
+               Resources: configs.Resources{
+                       Max:        getResourceConf(),
+                       Guaranteed: getResourceConf(),
+               },
+       }, 0},
+               {"clearing max resources", configs.QueueConfig{
+                       Resources: configs.Resources{
+                               Max:        nil,
+                               Guaranteed: nil,
+                       },
+               }, 0},
+               {"first time queue setup with delay", configs.QueueConfig{
+                       Resources: configs.Resources{
+                               Max:        getResourceConf(),
+                               Guaranteed: getResourceConf(),
+                       },
+                       Preemption: configs.Preemption{
+                               Delay: 100,
+                       },
+               }, 100},
+               {"increase max with delay", configs.QueueConfig{
+                       Resources: configs.Resources{
+                               Max: map[string]string{"memory": "100000000"},
+                       },
+                       Preemption: configs.Preemption{
+                               Delay: 500,
+                       },
+               }, 0},
+               {"decrease max with delay", configs.QueueConfig{
+                       Resources: configs.Resources{
+                               Max: map[string]string{"memory": "100"},
+                       },
+                       Preemption: configs.Preemption{
+                               Delay: 500,
+                       },
+               }, 500},
+               {"max remains as is but delay changed", configs.QueueConfig{
+                       Resources: configs.Resources{
+                               Max: map[string]string{"memory": "100"},
+                       },
+                       Preemption: configs.Preemption{
+                               Delay: 200,
+                       },
+               }, 200},
+               {"unrelated config change, should not impact earlier set 
preemption settings", configs.QueueConfig{
+                       Resources: configs.Resources{
+                               Max:        map[string]string{"memory": "100"},
+                               Guaranteed: map[string]string{"memory": "50"},
+                       },
+                       Preemption: configs.Preemption{
+                               Delay: 200,
+                       },
+               }, 200},
+               {"increase max again with delay", configs.QueueConfig{
+                       Resources: configs.Resources{
+                               Max: map[string]string{"memory": "101"},
+                       },
+                       Preemption: configs.Preemption{
+                               Delay: 200,
+                       },
+               }, 0}}
+
+       for _, tc := range testCases {
+               t.Run(tc.name, func(t *testing.T) {
+                       err = parent.ApplyConf(tc.conf)
+                       assert.NilError(t, err, "failed to apply conf: %v", err)
+                       assert.Equal(t, parent.quotaChangePreemptionDelay, 
tc.expectedDelay)
+               })
+       }
+}
+
 func TestNewConfiguredQueue(t *testing.T) {
        // check variable assignment
        properties := getProperties()
@@ -2275,6 +2357,7 @@ func TestNewConfiguredQueue(t *testing.T) {
        assert.DeepEqual(t, properties, parent.template.GetProperties())
        assert.Assert(t, resources.Equals(resourceStruct, 
parent.template.GetMaxResource()))
        assert.Assert(t, resources.Equals(resourceStruct, 
parent.template.GetGuaranteedResource()))
+       assert.Equal(t, parent.quotaChangePreemptionDelay, uint64(0))
 
        // case 0: managed leaf queue can't use template
        leafConfig := configs.QueueConfig{
@@ -2285,6 +2368,9 @@ func TestNewConfiguredQueue(t *testing.T) {
                        Max:        getResourceConf(),
                        Guaranteed: getResourceConf(),
                },
+               Preemption: configs.Preemption{
+                       Delay: 500,
+               },
        }
        childLeaf, err := NewConfiguredQueue(leafConfig, parent, false, nil)
        assert.NilError(t, err, "failed to create queue: %v", err)
@@ -2297,11 +2383,15 @@ func TestNewConfiguredQueue(t *testing.T) {
        childLeafGuaranteed, err := 
resources.NewResourceFromConf(leafConfig.Resources.Guaranteed)
        assert.NilError(t, err, "Resource creation failed")
        assert.Assert(t, resources.Equals(childLeaf.guaranteedResource, 
childLeafGuaranteed))
+       assert.Equal(t, childLeaf.quotaChangePreemptionDelay, uint64(500))
 
        // case 1: non-leaf can't use template but it can inherit template from 
parent
        NonLeafConfig := configs.QueueConfig{
                Name:   "nonleaf_queue",
                Parent: true,
+               Preemption: configs.Preemption{
+                       Delay: 500,
+               },
        }
        childNonLeaf, err := NewConfiguredQueue(NonLeafConfig, parent, false, 
nil)
        assert.NilError(t, err, "failed to create queue: %v", err)
@@ -2310,6 +2400,7 @@ func TestNewConfiguredQueue(t *testing.T) {
        assert.Equal(t, len(childNonLeaf.properties), 0)
        assert.Assert(t, childNonLeaf.guaranteedResource == nil)
        assert.Assert(t, childNonLeaf.maxResource == nil)
+       assert.Equal(t, childNonLeaf.quotaChangePreemptionDelay, uint64(0))
 
        // case 2: do not send queue event when silence flag is set to true
        events.Init()
@@ -2317,12 +2408,17 @@ func TestNewConfiguredQueue(t *testing.T) {
        eventSystem.StartServiceWithPublisher(false)
        rootConfig := configs.QueueConfig{
                Name: "root",
+               Preemption: configs.Preemption{
+                       Delay: 500,
+               },
        }
-       _, err = NewConfiguredQueue(rootConfig, nil, true, nil)
+
+       rootQ, err := NewConfiguredQueue(rootConfig, nil, true, nil)
        assert.NilError(t, err, "failed to create queue: %v", err)
        time.Sleep(time.Second)
        noEvents := eventSystem.Store.CountStoredEvents()
        assert.Equal(t, noEvents, uint64(0), "expected 0 event, got %d", 
noEvents)
+       assert.Equal(t, rootQ.quotaChangePreemptionDelay, uint64(0))
 }
 
 func TestResetRunningState(t *testing.T) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to