This is an automated email from the ASF dual-hosted git repository.

mani pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git


The following commit(s) were added to refs/heads/master by this push:
     new cefba883 [YUNIKORN-3155] Leaf Queue Selection process when parent 
queue quota lowers (#1051)
cefba883 is described below

commit cefba883ac89a2e228a82daa65b8adfc920ff172
Author: mani <[email protected]>
AuthorDate: Thu Nov 27 18:38:48 2025 +0530

    [YUNIKORN-3155] Leaf Queue Selection process when parent queue quota lowers 
(#1051)
    
    Closes: #1051
    
    Signed-off-by: mani <[email protected]>
---
 pkg/common/resources/resources.go                  |  10 +-
 pkg/common/resources/resources_test.go             |   2 +-
 pkg/log/logger.go                                  |  62 +++---
 pkg/scheduler/objects/quota_change_preemptor.go    | 123 +++++++++--
 .../objects/quota_change_preemptor_test.go         | 236 +++++++++++++++++++++
 5 files changed, 384 insertions(+), 49 deletions(-)

diff --git a/pkg/common/resources/resources.go 
b/pkg/common/resources/resources.go
index 3c51f061..ced4bc81 100644
--- a/pkg/common/resources/resources.go
+++ b/pkg/common/resources/resources.go
@@ -542,7 +542,7 @@ func getFairShare(allocated, guaranteed, fair *Resource) 
float64 {
 // Get the share of each resource quantity when compared to the total
 // resources quantity
 // NOTE: shares can be negative and positive in the current assumptions
-func getShares(res, total *Resource) []float64 {
+func GetShares(res, total *Resource) []float64 {
        // shortcut if the passed in resource to get the share on is nil or 
empty (sparse)
        if res == nil || len(res.Resources) == 0 {
                return make([]float64, 0)
@@ -592,8 +592,8 @@ func getShares(res, total *Resource) []float64 {
 // 1 if the left share is larger
 // -1 if the right share is larger
 func CompUsageRatio(left, right, total *Resource) int {
-       lshares := getShares(left, total)
-       rshares := getShares(right, total)
+       lshares := GetShares(left, total)
+       rshares := GetShares(right, total)
 
        return compareShares(lshares, rshares)
 }
@@ -622,8 +622,8 @@ func CompUsageRatioSeparately(leftAllocated, 
leftGuaranteed, leftFairMax, rightA
 // highest share for right resource from total.
 // If highest share for the right resource is 0 fairness is 1
 func FairnessRatio(left, right, total *Resource) float64 {
-       lshares := getShares(left, total)
-       rshares := getShares(right, total)
+       lshares := GetShares(left, total)
+       rshares := GetShares(right, total)
 
        // Get the largest value from the shares
        lshare := float64(0)
diff --git a/pkg/common/resources/resources_test.go 
b/pkg/common/resources/resources_test.go
index a5f20793..4375a076 100644
--- a/pkg/common/resources/resources_test.go
+++ b/pkg/common/resources/resources_test.go
@@ -1614,7 +1614,7 @@ func TestGetShares(t *testing.T) {
 
        for _, tc := range tests {
                t.Run(tc.message, func(t *testing.T) {
-                       shares := getShares(tc.res, tc.total)
+                       shares := GetShares(tc.res, tc.total)
                        if !reflect.DeepEqual(shares, tc.expected) {
                                t.Errorf("incorrect shares for %s, expected %v 
got: %v", tc.message, tc.expected, shares)
                        }
diff --git a/pkg/log/logger.go b/pkg/log/logger.go
index 6ef29fe9..bbfe3ba3 100644
--- a/pkg/log/logger.go
+++ b/pkg/log/logger.go
@@ -54,43 +54,43 @@ const (
 
 // Defined loggers: when adding new loggers, ids must be sequential, and all 
must be added to the loggers slice in the same order
 var (
-       Core                      = &LoggerHandle{id: 0, name: "core"}
-       Test                      = &LoggerHandle{id: 1, name: "test"}
-       Deprecation               = &LoggerHandle{id: 2, name: "deprecation"}
-       Config                    = &LoggerHandle{id: 3, name: "core.config"}
-       Entrypoint                = &LoggerHandle{id: 4, name: 
"core.entrypoint"}
-       Events                    = &LoggerHandle{id: 5, name: "core.events"}
-       OpenTracing               = &LoggerHandle{id: 6, name: 
"core.opentracing"}
-       Resources                 = &LoggerHandle{id: 7, name: "core.resources"}
-       REST                      = &LoggerHandle{id: 8, name: "core.rest"}
-       RMProxy                   = &LoggerHandle{id: 9, name: "core.rmproxy"}
-       RPC                       = &LoggerHandle{id: 10, name: "core.rpc"}
-       Metrics                   = &LoggerHandle{id: 11, name: "core.metrics"}
-       Scheduler                 = &LoggerHandle{id: 12, name: 
"core.scheduler"}
-       SchedAllocation           = &LoggerHandle{id: 13, name: 
"core.scheduler.allocation"}
-       SchedApplication          = &LoggerHandle{id: 14, name: 
"core.scheduler.application"}
-       SchedAppUsage             = &LoggerHandle{id: 15, name: 
"core.scheduler.application.usage"}
-       SchedContext              = &LoggerHandle{id: 16, name: 
"core.scheduler.context"}
-       SchedFSM                  = &LoggerHandle{id: 17, name: 
"core.scheduler.fsm"}
-       SchedHealth               = &LoggerHandle{id: 18, name: 
"core.scheduler.health"}
-       SchedNode                 = &LoggerHandle{id: 19, name: 
"core.scheduler.node"}
-       SchedPartition            = &LoggerHandle{id: 20, name: 
"core.scheduler.partition"}
-       SchedPreemption           = &LoggerHandle{id: 21, name: 
"core.scheduler.preemption"}
-       SchedQueue                = &LoggerHandle{id: 22, name: 
"core.scheduler.queue"}
-       SchedReservation          = &LoggerHandle{id: 23, name: 
"core.scheduler.reservation"}
-       SchedUGM                  = &LoggerHandle{id: 24, name: 
"core.scheduler.ugm"}
-       SchedNodesUsage           = &LoggerHandle{id: 25, name: 
"core.scheduler.nodesusage"}
-       Security                  = &LoggerHandle{id: 26, name: "core.security"}
-       Utils                     = &LoggerHandle{id: 27, name: "core.utils"}
-       Diagnostics               = &LoggerHandle{id: 28, name: 
"core.diagnostics"}
-       ShedQuotaChangePreemption = &LoggerHandle{id: 29, name: 
"core.scheduler.preemption.quotachange"}
+       Core                       = &LoggerHandle{id: 0, name: "core"}
+       Test                       = &LoggerHandle{id: 1, name: "test"}
+       Deprecation                = &LoggerHandle{id: 2, name: "deprecation"}
+       Config                     = &LoggerHandle{id: 3, name: "core.config"}
+       Entrypoint                 = &LoggerHandle{id: 4, name: 
"core.entrypoint"}
+       Events                     = &LoggerHandle{id: 5, name: "core.events"}
+       OpenTracing                = &LoggerHandle{id: 6, name: 
"core.opentracing"}
+       Resources                  = &LoggerHandle{id: 7, name: 
"core.resources"}
+       REST                       = &LoggerHandle{id: 8, name: "core.rest"}
+       RMProxy                    = &LoggerHandle{id: 9, name: "core.rmproxy"}
+       RPC                        = &LoggerHandle{id: 10, name: "core.rpc"}
+       Metrics                    = &LoggerHandle{id: 11, name: "core.metrics"}
+       Scheduler                  = &LoggerHandle{id: 12, name: 
"core.scheduler"}
+       SchedAllocation            = &LoggerHandle{id: 13, name: 
"core.scheduler.allocation"}
+       SchedApplication           = &LoggerHandle{id: 14, name: 
"core.scheduler.application"}
+       SchedAppUsage              = &LoggerHandle{id: 15, name: 
"core.scheduler.application.usage"}
+       SchedContext               = &LoggerHandle{id: 16, name: 
"core.scheduler.context"}
+       SchedFSM                   = &LoggerHandle{id: 17, name: 
"core.scheduler.fsm"}
+       SchedHealth                = &LoggerHandle{id: 18, name: 
"core.scheduler.health"}
+       SchedNode                  = &LoggerHandle{id: 19, name: 
"core.scheduler.node"}
+       SchedPartition             = &LoggerHandle{id: 20, name: 
"core.scheduler.partition"}
+       SchedPreemption            = &LoggerHandle{id: 21, name: 
"core.scheduler.preemption"}
+       SchedQueue                 = &LoggerHandle{id: 22, name: 
"core.scheduler.queue"}
+       SchedReservation           = &LoggerHandle{id: 23, name: 
"core.scheduler.reservation"}
+       SchedUGM                   = &LoggerHandle{id: 24, name: 
"core.scheduler.ugm"}
+       SchedNodesUsage            = &LoggerHandle{id: 25, name: 
"core.scheduler.nodesusage"}
+       Security                   = &LoggerHandle{id: 26, name: 
"core.security"}
+       Utils                      = &LoggerHandle{id: 27, name: "core.utils"}
+       Diagnostics                = &LoggerHandle{id: 28, name: 
"core.diagnostics"}
+       SchedQuotaChangePreemption = &LoggerHandle{id: 29, name: 
"core.scheduler.preemption.quotachange"}
 )
 
 // this tracks all the known logger handles, used to preallocate the real 
logger instances when configuration changes
 var loggers = []*LoggerHandle{
        Core, Test, Deprecation, Config, Entrypoint, Events, OpenTracing, 
Resources, REST, RMProxy, RPC, Metrics,
        Scheduler, SchedAllocation, SchedApplication, SchedAppUsage, 
SchedContext, SchedFSM, SchedHealth, SchedNode,
-       SchedPartition, SchedPreemption, SchedQueue, SchedReservation, 
SchedUGM, SchedNodesUsage, Security, Utils, Diagnostics, 
ShedQuotaChangePreemption,
+       SchedPartition, SchedPreemption, SchedQueue, SchedReservation, 
SchedUGM, SchedNodesUsage, Security, Utils, Diagnostics, 
SchedQuotaChangePreemption,
 }
 
 // structure to hold all current logger configuration state
diff --git a/pkg/scheduler/objects/quota_change_preemptor.go 
b/pkg/scheduler/objects/quota_change_preemptor.go
index 018247ed..3e70d166 100644
--- a/pkg/scheduler/objects/quota_change_preemptor.go
+++ b/pkg/scheduler/objects/quota_change_preemptor.go
@@ -54,18 +54,47 @@ func (qcp *QuotaChangePreemptionContext) 
CheckPreconditions() bool {
        if !qcp.queue.IsLeafQueue() || !qcp.queue.IsManaged() || 
qcp.queue.IsQuotaChangePreemptionRunning() {
                return false
        }
-       if 
qcp.maxResource.StrictlyGreaterThanOnlyExisting(qcp.queue.GetAllocatedResource())
 {
+       if 
qcp.maxResource.StrictlyGreaterThanOrEqualsOnlyExisting(qcp.queue.GetAllocatedResource())
 {
                return false
        }
        return true
 }
 
 func (qcp *QuotaChangePreemptionContext) tryPreemption() {
+       // Get Preemptable Resource
+       preemptableResource := qcp.getPreemptableResources()
+
+       if !qcp.queue.IsLeafQueue() {
+               leafQueues := make(map[*Queue]*resources.Resource)
+               getChildQueuesPreemptableResource(qcp.queue, 
preemptableResource, leafQueues)
+
+               log.Log(log.SchedQuotaChangePreemption).Info("Triggering quota 
change preemption for parent queue",
+                       zap.String("parent queue", qcp.queue.GetQueuePath()),
+                       zap.String("preemptable resource", 
preemptableResource.String()),
+                       zap.Any("no. of leaf queues with potential victims", 
len(leafQueues)),
+               )
+
+               for leaf, leafPreemptableResource := range leafQueues {
+                       leafQueueQCPC := NewQuotaChangePreemptor(leaf)
+                       
log.Log(log.SchedQuotaChangePreemption).Info("Triggering quota change 
preemption for leaf queue",
+                               zap.String("leaf queue", leaf.GetQueuePath()),
+                               zap.String("max resource", 
leafQueueQCPC.maxResource.String()),
+                               zap.String("guaranteed resource", 
leafQueueQCPC.guaranteedResource.String()),
+                               zap.String("actual allocated resource", 
leafQueueQCPC.allocatedResource.String()),
+                               zap.String("preemptable resource distribution", 
leafPreemptableResource.String()),
+                       )
+                       
leafQueueQCPC.tryPreemptionInternal(leafPreemptableResource)
+               }
+       } else {
+               qcp.tryPreemptionInternal(preemptableResource)
+       }
+}
+
+func (qcp *QuotaChangePreemptionContext) 
tryPreemptionInternal(preemptableResource *resources.Resource) {
        // quota change preemption has started, so mark the flag
        qcp.queue.MarkQuotaChangePreemptionRunning(true)
 
-       // Get Preemptable Resource
-       qcp.preemptableResource = qcp.getPreemptableResources()
+       qcp.preemptableResource = preemptableResource
 
        // Filter the allocations
        qcp.allocations = qcp.filterAllocations()
@@ -83,6 +112,72 @@ func (qcp *QuotaChangePreemptionContext) tryPreemption() {
        qcp.queue.resetPreemptionSettings()
 }
 
+// getChildQueuesPreemptableResource Compute leaf queue's preemptable resource 
distribution from the parent's preemptable resource.
+// Start with immediate children of parent, compute each child distribution 
from its parent preemptable resource and repeat the same
+// for all children at all levels until end leaf queues processed recursively.
+
+// In order to achieve a fair distribution of parent's preemptable resource 
among its children,
+// Higher (relatively) the usage is, higher the preemptable resource would be 
resulted in.
+// Usage above guaranteed (if set) is only considered to derive the 
preemptable resource.
+func getChildQueuesPreemptableResource(queue *Queue, parentPreemptableResource 
*resources.Resource, childQueues map[*Queue]*resources.Resource) {
+       children := queue.GetCopyOfChildren()
+       if len(children) == 0 {
+               return
+       }
+
+       // Sum of all children preemptable resources
+       totalPreemptableResource := resources.NewResource()
+
+       // Preemptable resource of all children
+       childrenPreemptableResource := make(map[*Queue]*resources.Resource)
+
+       // Traverse each child and calculate its own preemptable resource. 
Preemptable resource is the amount of resources used above than the guaranteed 
set.
+       // In case guaranteed not set, entire used resources is treated as 
preemptable resource.
+       // Total preemptable resource (sum of all children's preemptable 
resources) would be calculated along the way.
+       for _, child := range children {
+               // Skip child if there is no usage or usage below or equals 
guaranteed
+               if child.GetAllocatedResource().IsEmpty() || 
child.GetGuaranteedResource().StrictlyGreaterThanOrEqualsOnlyExisting(child.GetAllocatedResource())
 {
+                       continue
+               }
+               var usedResource *resources.Resource
+               if !child.GetGuaranteedResource().IsEmpty() {
+                       usedResource = 
resources.SubOnlyExisting(child.GetGuaranteedResource(), 
child.GetAllocatedResource())
+               } else {
+                       usedResource = child.GetAllocatedResource()
+               }
+               preemptableResource := resources.NewResource()
+               for k, v := range usedResource.Resources {
+                       if v < 0 {
+                               preemptableResource.Resources[k] = v * -1
+                       } else {
+                               preemptableResource.Resources[k] = v
+                       }
+               }
+               childrenPreemptableResource[child] = preemptableResource
+               totalPreemptableResource.AddTo(preemptableResource)
+       }
+
+       // Second pass: Traverse each child and calculate percentage of each 
resource type based on total preemptable resource.
+       // Apply percentage on parent's preemptable resource to derive its 
individual distribution
+       // or share of resources to be preempted.
+       for c, pRes := range childrenPreemptableResource {
+               i := 0
+               childPreemptableResource := resources.NewResource()
+               per := resources.GetShares(pRes, totalPreemptableResource)
+               for k, v := range parentPreemptableResource.Resources {
+                       // Need to be improved further
+                       value := math.RoundToEven(per[i] * float64(v))
+                       childPreemptableResource.Resources[k] = 
resources.Quantity(value)
+                       i++
+               }
+               if c.IsLeafQueue() {
+                       childQueues[c] = childPreemptableResource
+               } else {
+                       getChildQueuesPreemptableResource(c, 
childPreemptableResource, childQueues)
+               }
+       }
+}
+
 // getPreemptableResources Get the preemptable resources for the queue
 // Subtracting the usage from the max resource gives the preemptable resources.
 // It could contain both positive and negative values. Only negative values 
are preemptable.
@@ -140,7 +235,7 @@ func (qcp *QuotaChangePreemptionContext) 
filterAllocations() []*Allocation {
                        allocations = append(allocations, alloc)
                }
        }
-       log.Log(log.ShedQuotaChangePreemption).Info("Filtering allocations",
+       log.Log(log.SchedQuotaChangePreemption).Info("Filtering allocations",
                zap.String("queue", qcp.queue.GetQueuePath()),
                zap.Int("filtered allocations", len(allocations)),
        )
@@ -160,28 +255,32 @@ func (qcp *QuotaChangePreemptionContext) 
sortAllocations() {
 // Otherwise, exceeding above the required resources slightly is acceptable 
for now.
 func (qcp *QuotaChangePreemptionContext) preemptVictims() {
        if len(qcp.allocations) == 0 {
+               log.Log(log.SchedQuotaChangePreemption).Warn("BUG: No victims 
to enforce quota change through preemption",
+                       zap.String("queue", qcp.queue.GetQueuePath()))
                return
        }
-       log.Log(log.ShedQuotaChangePreemption).Info("Found victims for quota 
change preemption",
-               zap.String("queue", qcp.queue.GetQueuePath()),
-               zap.Int("total victims", len(qcp.allocations)))
        apps := make(map[*Application][]*Allocation)
        victimsTotalResource := resources.NewResource()
-       isGuaranteedAndMaxEquals := qcp.maxResource != nil && 
qcp.guaranteedResource != nil && resources.Equals(qcp.maxResource, 
qcp.guaranteedResource)
-       log.Log(log.ShedQuotaChangePreemption).Info("Found victims for quota 
change preemption",
+       log.Log(log.SchedQuotaChangePreemption).Info("Found victims for quota 
change preemption",
                zap.String("queue", qcp.queue.GetQueuePath()),
                zap.Int("total victims", len(qcp.allocations)),
                zap.String("max resources", qcp.maxResource.String()),
                zap.String("guaranteed resources", 
qcp.guaranteedResource.String()),
                zap.String("allocated resources", 
qcp.allocatedResource.String()),
                zap.String("preemptable resources", 
qcp.preemptableResource.String()),
-               zap.Bool("isGuaranteedSet", isGuaranteedAndMaxEquals),
+               zap.Bool("isGuaranteedSet", qcp.guaranteedResource.IsEmpty()),
        )
        for _, victim := range qcp.allocations {
                if 
!qcp.preemptableResource.FitInMaxUndef(victim.GetAllocatedResource()) {
                        continue
                }
                application := qcp.queue.GetApplication(victim.applicationID)
+               if application == nil {
+                       log.Log(log.SchedQuotaChangePreemption).Warn("BUG: 
application not found in queue",
+                               zap.String("queue", qcp.queue.GetQueuePath()),
+                               zap.String("application", victim.applicationID))
+                       continue
+               }
 
                // Keep collecting the victims until preemptable resource 
reaches and subtract the usage
                if 
qcp.preemptableResource.StrictlyGreaterThanOnlyExisting(victimsTotalResource) {
@@ -191,7 +290,7 @@ func (qcp *QuotaChangePreemptionContext) preemptVictims() {
 
                // Has usage gone below the guaranteed resources?
                // If yes, revert the recently added victim steps completely 
and try next victim.
-               if isGuaranteedAndMaxEquals && 
!qcp.allocatedResource.StrictlyGreaterThanOnlyExisting(qcp.guaranteedResource) {
+               if !qcp.guaranteedResource.IsEmpty() && 
qcp.guaranteedResource.StrictlyGreaterThanOnlyExisting(qcp.allocatedResource) {
                        victims := apps[application]
                        exceptRecentlyAddedVictims := victims[:len(victims)-1]
                        apps[application] = exceptRecentlyAddedVictims
@@ -205,7 +304,7 @@ func (qcp *QuotaChangePreemptionContext) preemptVictims() {
        for app, victims := range apps {
                if len(victims) > 0 {
                        for _, victim := range victims {
-                               
log.Log(log.ShedQuotaChangePreemption).Info("Preempting victims for quota 
change preemption",
+                               
log.Log(log.SchedQuotaChangePreemption).Info("Preempting victims for quota 
change preemption",
                                        zap.String("queue", 
qcp.queue.GetQueuePath()),
                                        zap.String("victim allocation key", 
victim.allocationKey),
                                        zap.String("victim allocated 
resources", victim.GetAllocatedResource().String()),
diff --git a/pkg/scheduler/objects/quota_change_preemptor_test.go 
b/pkg/scheduler/objects/quota_change_preemptor_test.go
index ec2f3e81..10ab82ab 100644
--- a/pkg/scheduler/objects/quota_change_preemptor_test.go
+++ b/pkg/scheduler/objects/quota_change_preemptor_test.go
@@ -71,6 +71,13 @@ func TestQuotaChangeCheckPreconditions(t *testing.T) {
        assert.NilError(t, err)
        usageExceededMaxQueue.allocatedResource = 
resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 2000})
 
+       usageEqualsMaxQueue, err := NewConfiguredQueue(configs.QueueConfig{
+               Name:      "leaf-usage-equals-max",
+               Resources: leafRes,
+       }, parent, false, nil)
+       assert.NilError(t, err)
+       usageEqualsMaxQueue.allocatedResource = 
resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 1000})
+
        testCases := []struct {
                name               string
                queue              *Queue
@@ -81,6 +88,7 @@ func TestQuotaChangeCheckPreconditions(t *testing.T) {
                {"dynamic leaf queue", dynamicLeaf, false},
                {"leaf queue, already preemption process started or running", 
alreadyPreemptionRunning, false},
                {"leaf queue, usage exceeded max resources", 
usageExceededMaxQueue, true},
+               {"leaf queue, usage equals max resources", usageEqualsMaxQueue, 
false},
        }
        for _, tc := range testCases {
                t.Run(tc.name, func(t *testing.T) {
@@ -259,6 +267,234 @@ func TestQuotaChangeTryPreemption(t *testing.T) {
        }
 }
 
+// TestQuotaChangeGetLeafQueuesPreemptableResource Test leaf queues 
distribution from parent's preemptable resources under different circumstances
+// Queue Structure:
+// parent
+//
+//     leaf 1 (Guaranteed set for this hierarchy)
+//             leaf11
+//                     leaf111
+//             leaf12
+//     leaf2 (Guaranteed not set for this hierarchy)
+//             leaf21
+//                     leaf211
+//             leaf22
+//     leaf3 (No usage)
+//     leaf4 (Guaranteed set but equals usage)
+func TestQuotaChangeGetLeafQueuesPreemptableResource(t *testing.T) {
+       parentConfig := configs.QueueConfig{Name: "parent", Parent: true}
+       parent, err := NewConfiguredQueue(parentConfig, nil, false, nil)
+       assert.NilError(t, err)
+
+       leaf111, leaf12, leaf211, leaf22, leaf4 := createQueueSetups(t, parent, 
configs.Resources{Guaranteed: map[string]string{"first": "10"}}, 
configs.Resources{})
+
+       parent.GetChildQueue("leaf1").allocatedResource = 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 50})
+       parent.GetChildQueue("leaf2").allocatedResource = 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 80})
+       leaf4.allocatedResource = 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10})
+       parent.GetChildQueue("leaf1").GetChildQueue("leaf11").allocatedResource 
= resources.NewResourceFromMap(map[string]resources.Quantity{"first": 30})
+       parent.GetChildQueue("leaf1").GetChildQueue("leaf12").allocatedResource 
= resources.NewResourceFromMap(map[string]resources.Quantity{"first": 20})
+       parent.GetChildQueue("leaf2").GetChildQueue("leaf21").allocatedResource 
= resources.NewResourceFromMap(map[string]resources.Quantity{"first": 50})
+       leaf22.allocatedResource = 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 30})
+       leaf111.allocatedResource = 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 30})
+       leaf211.allocatedResource = 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 50})
+
+       testCases := []struct {
+               name              string
+               parentQueue       *Queue
+               parentPreemptable *resources.Resource
+               leaf111PRes       *resources.Resource
+               leaf12PRes        *resources.Resource
+               leaf211PRes       *resources.Resource
+               leaf22PRes        *resources.Resource
+       }{
+               {"normal preemptable resources  - normal distribution", parent, 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 100}),
+                       
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 22}), 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 11}),
+                       
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 42}), 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 25})},
+
+               {"twice the preemptable resources - twice the normal 
distribution", parent, 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 200}),
+                       
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 45}), 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 22}),
+                       
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 83}), 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 50})},
+
+               {"half the preemptable resources - half the normal 
distribution", parent, 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 50}),
+                       
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 11}), 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 6}),
+                       
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 21}), 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 12})},
+       }
+       for _, tc := range testCases {
+               t.Run(tc.name, func(t *testing.T) {
+                       childQueues := make(map[*Queue]*resources.Resource)
+                       getChildQueuesPreemptableResource(tc.parentQueue, 
tc.parentPreemptable, childQueues)
+                       assert.Equal(t, len(childQueues), 4)
+                       assert.Equal(t, resources.Equals(childQueues[leaf111], 
tc.leaf111PRes), true)
+                       assert.Equal(t, resources.Equals(childQueues[leaf12], 
tc.leaf12PRes), true)
+                       assert.Equal(t, resources.Equals(childQueues[leaf211], 
tc.leaf211PRes), true)
+                       assert.Equal(t, resources.Equals(childQueues[leaf22], 
tc.leaf22PRes), true)
+                       if _, ok := childQueues[parent.GetChildQueue("leaf3")]; 
ok {
+                               t.Fatal("leaf 3 queue exists")
+                       }
+                       if _, ok := childQueues[parent.GetChildQueue("leaf4")]; 
ok {
+                               t.Fatal("leaf 4 queue exists")
+                       }
+               })
+       }
+}
+
+func TestQuotaChangeTryPreemptionForParentQueue(t *testing.T) {
+       node := NewNode(&si.NodeInfo{
+               NodeID:     "node",
+               Attributes: nil,
+               SchedulableResource: &si.Resource{
+                       Resources: map[string]*si.Quantity{"first": {Value: 
200}},
+               },
+       })
+
+       parentConfig := configs.QueueConfig{Name: "parent", Parent: true}
+       parent, err := NewConfiguredQueue(parentConfig, nil, false, nil)
+       assert.NilError(t, err)
+
+       parentConfig1 := configs.QueueConfig{Name: "parent1", Parent: true}
+       parent1, err := NewConfiguredQueue(parentConfig1, nil, false, nil)
+       assert.NilError(t, err)
+
+       leaf111G, leaf12G, leaf211G, leaf22G, leaf4G := createQueueSetups(t, 
parent, configs.Resources{Guaranteed: map[string]string{"first": "10"}}, 
configs.Resources{})
+       leaf111, leaf12, leaf211, leaf22, leaf4 := createQueueSetups(t, 
parent1, configs.Resources{}, configs.Resources{})
+
+       suitableVictims := make([]*Allocation, 0)
+       suitableVictims = append(suitableVictims, createVictim(t, "ask1", node, 
5, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10})))
+       suitableVictims = append(suitableVictims, createVictim(t, "ask2", node, 
4, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10})))
+       suitableVictims = append(suitableVictims, createVictim(t, "ask3", node, 
4, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10})))
+
+       leafGVictims := make(map[*Queue][]*Allocation)
+       leafGVictims[leaf111G] = suitableVictims
+       leafVictims := make(map[*Queue][]*Allocation)
+       leafVictims[leaf111] = suitableVictims
+
+       suitableVictims1 := make([]*Allocation, 0)
+       suitableVictims1 = append(suitableVictims1, createVictim(t, "ask4", 
node, 5, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 
10})))
+       suitableVictims1 = append(suitableVictims1, createVictim(t, "ask5", 
node, 4, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 
10})))
+       leafGVictims[leaf12G] = suitableVictims1
+       leafVictims[leaf12] = suitableVictims1
+
+       suitableVictims2 := make([]*Allocation, 0)
+       suitableVictims2 = append(suitableVictims2, createVictim(t, "ask6", 
node, 5, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 
10})))
+       suitableVictims2 = append(suitableVictims2, createVictim(t, "ask7", 
node, 4, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 
10})))
+       suitableVictims2 = append(suitableVictims2, createVictim(t, "ask8", 
node, 5, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 
10})))
+       suitableVictims2 = append(suitableVictims2, createVictim(t, "ask9", 
node, 4, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 
10})))
+       suitableVictims2 = append(suitableVictims2, createVictim(t, "ask10", 
node, 4, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 
10})))
+       leafGVictims[leaf211G] = suitableVictims2
+       leafVictims[leaf211] = suitableVictims2
+
+       suitableVictims3 := make([]*Allocation, 0)
+       suitableVictims3 = append(suitableVictims3, createVictim(t, "ask11", 
node, 5, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 
10})))
+       suitableVictims3 = append(suitableVictims3, createVictim(t, "ask12", 
node, 4, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 
10})))
+       suitableVictims3 = append(suitableVictims3, createVictim(t, "ask13", 
node, 4, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 
10})))
+
+       leafGVictims[leaf22G] = suitableVictims3
+       leafVictims[leaf22] = suitableVictims3
+
+       v := createVictim(t, "ask14", node, 5, 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}))
+       leafGVictims[leaf4G] = []*Allocation{v}
+       leafVictims[leaf4] = []*Allocation{v}
+
+       expectedGVictims := make(map[*Queue]int)
+       expectedGVictims[leaf111G] = 2
+       expectedGVictims[leaf12G] = 1
+       expectedGVictims[leaf211G] = 5
+       expectedGVictims[leaf22G] = 3
+
+       expectedVictims := make(map[*Queue]int)
+       expectedVictims[leaf111] = 3
+       expectedVictims[leaf12] = 2
+       expectedVictims[leaf211] = 5
+       expectedVictims[leaf22] = 3
+
+       oldMax := 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 130})
+       newMax := 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10})
+
+       testCases := []struct {
+               name            string
+               queue           *Queue
+               oldMax          *resources.Resource
+               newMax          *resources.Resource
+               victims         map[*Queue][]*Allocation
+               expectedVictims map[*Queue]int
+       }{
+               {"Guaranteed set on one side of queue hierarchy - suitable 
victims available", parent, oldMax, newMax, leafGVictims, expectedGVictims},
+               {"Guaranteed set not set on any queue - suitable victims 
available", parent1, oldMax, newMax, leafVictims, expectedVictims},
+       }
+       for _, tc := range testCases {
+               t.Run(tc.name, func(t *testing.T) {
+                       tc.queue.maxResource = tc.oldMax
+                       for q, v := range tc.victims {
+                               assignAllocationsToQueue(v, q)
+                       }
+                       tc.queue.maxResource = tc.newMax
+                       preemptor := NewQuotaChangePreemptor(tc.queue)
+                       preemptor.tryPreemption()
+                       for q, asks := range tc.victims {
+                               var victimsCount int
+                               for _, a := range asks {
+                                       if a.IsPreempted() {
+                                               victimsCount++
+                                       }
+                               }
+                               assert.Equal(t, victimsCount, 
tc.expectedVictims[q])
+                       }
+                       for _, v := range tc.victims {
+                               removeAllocationAsks(node, v)
+                       }
+                       resetQueue(tc.queue)
+               })
+       }
+}
+
+// createQueueSetups Creates a queue hierarchy
+// Queue Structure:
+// parent
+//
+//     leaf 1 (Guaranteed set/or not set for this hierarchy)
+//             leaf11
+//                     leaf111
+//             leaf12
+//     leaf2 (Guaranteed not set for this hierarchy)
+//             leaf21
+//                     leaf211
+//             leaf22
+//     leaf3
+//     leaf4
+func createQueueSetups(t *testing.T, parent *Queue, leafResG 
configs.Resources, leafRes configs.Resources) (*Queue, *Queue, *Queue, *Queue, 
*Queue) {
+       leaf1, err := NewConfiguredQueue(configs.QueueConfig{Name: "leaf1", 
Parent: true, Resources: leafResG}, parent, false, nil)
+       assert.NilError(t, err)
+
+       leaf2, err := NewConfiguredQueue(configs.QueueConfig{Name: "leaf2", 
Parent: true, Resources: leafRes}, parent, false, nil)
+       assert.NilError(t, err)
+
+       _, err = NewConfiguredQueue(configs.QueueConfig{Name: "leaf3", 
Resources: leafRes}, parent, false, nil)
+       assert.NilError(t, err)
+
+       leaf4, err := NewConfiguredQueue(configs.QueueConfig{Name: "leaf4", 
Resources: leafResG}, parent, false, nil)
+       assert.NilError(t, err)
+
+       leaf11, err := NewConfiguredQueue(configs.QueueConfig{Name: "leaf11", 
Parent: true, Resources: leafResG}, leaf1, false, nil)
+       assert.NilError(t, err)
+
+       leaf12, err := NewConfiguredQueue(configs.QueueConfig{Name: "leaf12", 
Resources: leafResG}, leaf1, false, nil)
+       assert.NilError(t, err)
+
+       leaf21, err := NewConfiguredQueue(configs.QueueConfig{Name: "leaf21", 
Parent: true, Resources: leafRes}, leaf2, false, nil)
+       assert.NilError(t, err)
+
+       leaf22, err := NewConfiguredQueue(configs.QueueConfig{Name: "leaf22", 
Resources: leafRes}, leaf2, false, nil)
+       assert.NilError(t, err)
+
+       leaf111, err := NewConfiguredQueue(configs.QueueConfig{Name: "leaf111", 
Resources: leafResG}, leaf11, false, nil)
+       assert.NilError(t, err)
+
+       leaf211, err := NewConfiguredQueue(configs.QueueConfig{Name: "leaf211", 
Resources: leafRes}, leaf21, false, nil)
+       assert.NilError(t, err)
+
+       return leaf111, leaf12, leaf211, leaf22, leaf4
+}
+
 func createVictim(t *testing.T, allocKey string, node *Node, adjustment int, 
allocRes *resources.Resource) *Allocation {
        createTime := time.Now()
        allocation := createAllocation(allocKey, "app1", node.NodeID, true, 
false, 10, false, allocRes)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to