This is an automated email from the ASF dual-hosted git repository.

pbacsko pushed a commit to branch branch-1.8
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git

commit 70fc74fab1afd1dcd038c768182c1c2a6d153b05
Author: mani <[email protected]>
AuthorDate: Sat Dec 13 14:40:21 2025 +0530

    [YUNIKORN-3157] Calculate Preemptable Resource for leaf Queue with 
different resource types (#1055)
    
    Closes: #1055
    
    Signed-off-by: mani <[email protected]>
    (cherry picked from commit 0c96ff1c28039092aa4a4504effa472e36aa8ba8)
---
 pkg/common/resources/resources.go                  |  33 +++-
 pkg/scheduler/objects/quota_change_preemptor.go    |  15 +-
 .../objects/quota_change_preemptor_test.go         | 207 ++++++++++++++++++++-
 3 files changed, 233 insertions(+), 22 deletions(-)

diff --git a/pkg/common/resources/resources.go 
b/pkg/common/resources/resources.go
index ced4bc81..bb3df50a 100644
--- a/pkg/common/resources/resources.go
+++ b/pkg/common/resources/resources.go
@@ -539,15 +539,29 @@ func getFairShare(allocated, guaranteed, fair *Resource) 
float64 {
        return maxShare
 }
 
-// Get the share of each resource quantity when compared to the total
-// resources quantity
+// GetShares Get the share of each resource quantity when compared to the total
+// resource quantity. Resource type agnostic. Sort shares to order the same 
based on dominant resource.
 // NOTE: shares can be negative and positive in the current assumptions
 func GetShares(res, total *Resource) []float64 {
+       shares, _ := internalGetShares(res, total)
+       return shares
+}
+
+// GetSharesTypeWise Get the share of each resource quantity when compared to 
the total
+// resources quantity
+// NOTE: shares can be negative and positive in the current assumptions
+func GetSharesTypeWise(res, total *Resource) map[string]float64 {
+       _, sharesTypeWise := internalGetShares(res, total)
+       return sharesTypeWise
+}
+
+func internalGetShares(res, total *Resource) ([]float64, map[string]float64) {
        // shortcut if the passed in resource to get the share on is nil or 
empty (sparse)
        if res == nil || len(res.Resources) == 0 {
-               return make([]float64, 0)
+               return make([]float64, 0), make(map[string]float64)
        }
        shares := make([]float64, len(res.Resources))
+       sharesTypeWise := make(map[string]float64, len(res.Resources))
        idx := 0
        for k, v := range res.Resources {
                // no usage then there is no share (skip prevents NaN)
@@ -566,13 +580,18 @@ func GetShares(res, total *Resource) []float64 {
                                        zap.String("resource key", k),
                                        zap.Int64("resource quantity", 
int64(v)))
                        }
-                       shares[idx] = float64(v)
+                       share := float64(v)
+                       shares[idx] = share
+                       sharesTypeWise[k] = share
                        idx++
                        continue
                }
-               shares[idx] = float64(v) / float64(total.Resources[k])
+               share := float64(v) / float64(total.Resources[k])
+               shares[idx] = share
+               sharesTypeWise[k] = share
+
                // negative share is logged
-               if shares[idx] < 0 {
+               if shares[idx] < 0 || sharesTypeWise[k] < 0 {
                        log.Log(log.Resources).Debug("share set is negative",
                                zap.String("resource key", k),
                                zap.Int64("resource quantity", int64(v)),
@@ -583,7 +602,7 @@ func GetShares(res, total *Resource) []float64 {
 
        // sort in increasing order, NaN can not be part of the list
        sort.Float64s(shares)
-       return shares
+       return shares, sharesTypeWise
 }
 
 // Calculate share for left of total and right of total.
diff --git a/pkg/scheduler/objects/quota_change_preemptor.go 
b/pkg/scheduler/objects/quota_change_preemptor.go
index 3e70d166..8f8a024d 100644
--- a/pkg/scheduler/objects/quota_change_preemptor.go
+++ b/pkg/scheduler/objects/quota_change_preemptor.go
@@ -51,7 +51,7 @@ func NewQuotaChangePreemptor(queue *Queue) 
*QuotaChangePreemptionContext {
 }
 
 func (qcp *QuotaChangePreemptionContext) CheckPreconditions() bool {
-       if !qcp.queue.IsLeafQueue() || !qcp.queue.IsManaged() || 
qcp.queue.IsQuotaChangePreemptionRunning() {
+       if !qcp.queue.IsManaged() || qcp.queue.IsQuotaChangePreemptionRunning() 
{
                return false
        }
        if 
qcp.maxResource.StrictlyGreaterThanOrEqualsOnlyExisting(qcp.queue.GetAllocatedResource())
 {
@@ -161,14 +161,13 @@ func getChildQueuesPreemptableResource(queue *Queue, 
parentPreemptableResource *
        // Apply percentage on parent's preemptable resource to derive its 
individual distribution
        // or share of resources to be preempted.
        for c, pRes := range childrenPreemptableResource {
-               i := 0
                childPreemptableResource := resources.NewResource()
-               per := resources.GetShares(pRes, totalPreemptableResource)
-               for k, v := range parentPreemptableResource.Resources {
-                       // Need to be improved further
-                       value := math.RoundToEven(per[i] * float64(v))
-                       childPreemptableResource.Resources[k] = 
resources.Quantity(value)
-                       i++
+               per := resources.GetSharesTypeWise(pRes, 
totalPreemptableResource)
+               for k := range pRes.Resources {
+                       if _, ok := parentPreemptableResource.Resources[k]; ok {
+                               value := math.RoundToEven(per[k] * 
float64(parentPreemptableResource.Resources[k]))
+                               childPreemptableResource.Resources[k] = 
resources.Quantity(value)
+                       }
                }
                if c.IsLeafQueue() {
                        childQueues[c] = childPreemptableResource
diff --git a/pkg/scheduler/objects/quota_change_preemptor_test.go 
b/pkg/scheduler/objects/quota_change_preemptor_test.go
index 10ab82ab..267e8bbd 100644
--- a/pkg/scheduler/objects/quota_change_preemptor_test.go
+++ b/pkg/scheduler/objects/quota_change_preemptor_test.go
@@ -40,6 +40,7 @@ func TestQuotaChangeCheckPreconditions(t *testing.T) {
        }
        parent, err := NewConfiguredQueue(parentConfig, nil, false, nil)
        assert.NilError(t, err)
+       parent.allocatedResource = 
resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 2000, 
"cpu": 2000})
 
        leafRes := configs.Resources{
                Max: map[string]string{"memory": "1000"},
@@ -69,26 +70,36 @@ func TestQuotaChangeCheckPreconditions(t *testing.T) {
                Resources: leafRes,
        }, parent, false, nil)
        assert.NilError(t, err)
-       usageExceededMaxQueue.allocatedResource = 
resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 2000})
+       usageExceededMaxQueue.allocatedResource = 
resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 2000, 
"cpu": 2000})
 
        usageEqualsMaxQueue, err := NewConfiguredQueue(configs.QueueConfig{
                Name:      "leaf-usage-equals-max",
                Resources: leafRes,
        }, parent, false, nil)
        assert.NilError(t, err)
-       usageEqualsMaxQueue.allocatedResource = 
resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 1000})
+       usageEqualsMaxQueue.allocatedResource = 
resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 1000, 
"cpu": 1000})
+
+       usageNotMatchingMaxQueue, err := NewConfiguredQueue(configs.QueueConfig{
+               Name: "leaf-usage-res-not-matching-max-res",
+               Resources: configs.Resources{
+                       Max: map[string]string{"cpu": "1000"},
+               },
+       }, parent, false, nil)
+       assert.NilError(t, err)
+       usageNotMatchingMaxQueue.allocatedResource = 
resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 1000})
 
        testCases := []struct {
                name               string
                queue              *Queue
                preconditionResult bool
        }{
-               {"parent queue", parent, false},
+               {"parent queue", parent, true},
                {"leaf queue", leaf, false},
                {"dynamic leaf queue", dynamicLeaf, false},
                {"leaf queue, already preemption process started or running", 
alreadyPreemptionRunning, false},
                {"leaf queue, usage exceeded max resources", 
usageExceededMaxQueue, true},
                {"leaf queue, usage equals max resources", usageEqualsMaxQueue, 
false},
+               {"leaf queue, usage res not matching max resources", 
usageNotMatchingMaxQueue, false},
        }
        for _, tc := range testCases {
                t.Run(tc.name, func(t *testing.T) {
@@ -116,8 +127,7 @@ func TestQuotaChangeGetPreemptableResource(t *testing.T) {
                {"nil used", leaf, 
resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 1000}), 
nil, nil},
                {"used below max", leaf, 
resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 1000}), 
resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 500}), 
nil},
                {"used above max", leaf, 
resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 1000}), 
resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 1500}), 
resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 500})},
-               {"used above max in specific res type", leaf, 
resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 1000, 
"cpu": 10}), 
resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 1500}), 
resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 500})},
-               {"used above max and below max in specific res type", leaf, 
resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 1000, 
"cpu": 10}), 
resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 1500, 
"cpu": 10}), 
resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 500})},
+               {"used above max, below max, equals max in specific res type 
and also with extra res types", leaf, 
resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 1000, 
"cpu": 10, "gpu": 10}), 
resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 1500, 
"cpu": 10, "gpu": 9}), 
resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 500})},
                {"used res type but max undefined", leaf, 
resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 1000}), 
resources.NewResourceFromMap(map[string]resources.Quantity{"cpu": 150}), nil},
        }
        for _, tc := range testCases {
@@ -267,7 +277,119 @@ func TestQuotaChangeTryPreemption(t *testing.T) {
        }
 }
 
-// TestQuotaChangeGetLeafQueuesPreemptableResource Test leaf queues 
distribution from parent's preemptable resources under different circumstances
+func TestQuotaChangeTryPreemptionWithDifferentResTypes(t *testing.T) {
+       leaf, err := NewConfiguredQueue(configs.QueueConfig{
+               Name: "leaf",
+       }, nil, false, nil)
+       assert.NilError(t, err)
+
+       node := NewNode(&si.NodeInfo{
+               NodeID:     "node",
+               Attributes: nil,
+               SchedulableResource: &si.Resource{
+                       Resources: map[string]*si.Quantity{"first": {Value: 
100}, "second": {Value: 200}},
+               },
+       })
+
+       suitableVictims := make([]*Allocation, 0)
+       overflowVictims := make([]*Allocation, 0)
+       oversizedVictims := make([]*Allocation, 0)
+
+       suitableVictims = append(suitableVictims, createVictim(t, "ask1", node, 
5, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10, 
"second": 10})))
+       suitableVictims = append(suitableVictims, createVictim(t, "ask2", node, 
4, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10, 
"second": 10})))
+
+       oversizedVictims = append(oversizedVictims, createVictim(t, "ask21", 
node, 4, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 9, 
"second": 10})))
+       oversizedVictims = append(oversizedVictims, createVictim(t, "ask3", 
node, 3, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 
11, "second": 10})))
+
+       overflowVictims = append(overflowVictims, createVictim(t, "ask4", node, 
3, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5, 
"second": 10})))
+       overflowVictims = append(overflowVictims, createVictim(t, "ask41", 
node, 2, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 6, 
"second": 10})))
+       overflowVictims = append(overflowVictims, createVictim(t, "ask42", 
node, 1, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 9, 
"second": 10})))
+
+       oldMax := 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 20})
+       newMax := 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10})
+       newMaxWithNewResTypes := 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10, 
"third": 10})
+       newMaxWithRemovedResTypes := 
resources.NewResourceFromMap(map[string]resources.Quantity{"second": 10})
+       lowerGuaranteed := 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5})
+       lowerGuaranteedWithNewResTypes := 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5, 
"fourth": 5})
+
+       type test struct {
+               allocs               []*Allocation
+               totalExpectedVictims int
+               expectedVictimsCount int
+       }
+
+       testCases := []struct {
+               name       string
+               queue      *Queue
+               oldMax     *resources.Resource
+               newMax     *resources.Resource
+               guaranteed *resources.Resource
+               victims    []test
+       }{
+               {"oversized victims available with extra resource types", leaf, 
oldMax, newMax, nil,
+                       []test{
+                               {oversizedVictims, 2, 1},
+                       },
+               },
+               {"suitable victims available with extra resource types other 
than defined in max", leaf, oldMax, newMax, nil,
+                       []test{
+                               {suitableVictims, 2, 1},
+                       },
+               },
+               {"suitable victims available with extra resource types other 
than defined in max", leaf, nil, newMax, nil,
+                       []test{
+                               {suitableVictims, 2, 1},
+                       },
+               },
+               {"suitable victims available with extra resource types other 
than defined in guaranteed", leaf, nil, newMax, lowerGuaranteed,
+                       []test{
+                               {suitableVictims, 2, 1},
+                       },
+               },
+               {"suitable victims available - different res types, adding new 
res type in max", leaf, oldMax, newMaxWithNewResTypes, nil,
+                       []test{
+                               {suitableVictims, 2, 1},
+                       },
+               },
+               {"suitable victims available - different res types, removing 
existing res type from max", leaf, oldMax, newMaxWithRemovedResTypes, nil,
+                       []test{
+                               {suitableVictims, 2, 1},
+                       },
+               },
+               {"overflow victims available with extra resource types other 
than defined in guaranteed and vice versa", leaf, oldMax, newMax, 
lowerGuaranteedWithNewResTypes,
+                       []test{
+                               {overflowVictims, 3, 2},
+                       },
+               },
+       }
+
+       for _, tc := range testCases {
+               t.Run(tc.name, func(t *testing.T) {
+                       for _, v := range tc.victims {
+                               leaf.maxResource = tc.oldMax
+                               leaf.guaranteedResource = tc.guaranteed
+                               asks := v.allocs
+                               assignAllocationsToQueue(asks, leaf)
+                               leaf.maxResource = tc.newMax
+                               leaf.guaranteedResource = tc.guaranteed
+                               preemptor := NewQuotaChangePreemptor(tc.queue)
+                               preemptor.tryPreemption()
+                               assert.Equal(t, len(preemptor.getVictims()), 
v.totalExpectedVictims)
+                               var victimsCount int
+                               for _, a := range asks {
+                                       if a.IsPreempted() {
+                                               victimsCount++
+                                       }
+                               }
+                               assert.Equal(t, victimsCount, 
v.expectedVictimsCount)
+                               removeAllocationAsks(node, asks)
+                               resetQueue(leaf)
+                       }
+               })
+       }
+}
+
+// TestQuotaChangeGetChildQueuesPreemptableResource Test child queues 
distribution from parent's preemptable resources under different circumstances
 // Queue Structure:
 // parent
 //
@@ -281,7 +403,7 @@ func TestQuotaChangeTryPreemption(t *testing.T) {
 //             leaf22
 //     leaf3 (No usage)
 //     leaf4 (Guaranteed set but equals usage)
-func TestQuotaChangeGetLeafQueuesPreemptableResource(t *testing.T) {
+func TestQuotaChangeGetChildQueuesPreemptableResource(t *testing.T) {
        parentConfig := configs.QueueConfig{Name: "parent", Parent: true}
        parent, err := NewConfiguredQueue(parentConfig, nil, false, nil)
        assert.NilError(t, err)
@@ -338,6 +460,77 @@ func TestQuotaChangeGetLeafQueuesPreemptableResource(t 
*testing.T) {
        }
 }
 
+// TestQuotaChangeGetChildQueuesPreemptableResourceWithDifferentResTypes Test 
child queues distribution from parent's preemptable resources under different 
circumstances with different resource types
+// Queue Structure:
+// parent
+//
+//     leaf 1 (Guaranteed set for this hierarchy)
+//             leaf11
+//                     leaf111
+//             leaf12
+//     leaf2 (Guaranteed not set for this hierarchy)
+//             leaf21
+//                     leaf211
+//             leaf22
+//     leaf3 (No usage)
+//     leaf4 (Guaranteed set but equals usage)
+func TestQuotaChangeGetChildQueuesPreemptableResourceWithDifferentResTypes(t 
*testing.T) {
+       parentConfig := configs.QueueConfig{Name: "parent", Parent: true}
+       parent, err := NewConfiguredQueue(parentConfig, nil, false, nil)
+       assert.NilError(t, err)
+
+       leaf111, leaf12, leaf211, leaf22, leaf4 := createQueueSetups(t, parent, 
configs.Resources{Guaranteed: map[string]string{"first": "10", "third": "10"}}, 
configs.Resources{})
+
+       parent.GetChildQueue("leaf1").allocatedResource = 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 50, 
"second": 50})
+       parent.GetChildQueue("leaf2").allocatedResource = 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 80})
+       leaf4.allocatedResource = 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10, 
"second": 10})
+       parent.GetChildQueue("leaf1").GetChildQueue("leaf11").allocatedResource 
= resources.NewResourceFromMap(map[string]resources.Quantity{"first": 30, 
"second": 30})
+       parent.GetChildQueue("leaf1").GetChildQueue("leaf12").allocatedResource 
= resources.NewResourceFromMap(map[string]resources.Quantity{"first": 20, 
"second": 20})
+       parent.GetChildQueue("leaf2").GetChildQueue("leaf21").allocatedResource 
= resources.NewResourceFromMap(map[string]resources.Quantity{"first": 50})
+       leaf22.allocatedResource = 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 30})
+       leaf111.allocatedResource = 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 30, 
"second": 30})
+       leaf211.allocatedResource = 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 50})
+
+       testCases := []struct {
+               name              string
+               parentQueue       *Queue
+               parentPreemptable *resources.Resource
+               leaf111PRes       *resources.Resource
+               leaf12PRes        *resources.Resource
+               leaf211PRes       *resources.Resource
+               leaf22PRes        *resources.Resource
+       }{
+               {"normal preemptable resources  - normal distribution", parent, 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 100, 
"fourth": 100}),
+                       
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 22}), 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 11}),
+                       
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 42}), 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 25})},
+
+               {"twice the preemptable resources - twice the normal 
distribution", parent, 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 200, 
"fourth": 100}),
+                       
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 45}), 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 22}),
+                       
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 83}), 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 50})},
+
+               {"half the preemptable resources - half the normal 
distribution", parent, 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 50, 
"fourth": 100}),
+                       
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 11}), 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 6}),
+                       
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 21}), 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 12})},
+       }
+       for _, tc := range testCases {
+               t.Run(tc.name, func(t *testing.T) {
+                       childQueues := make(map[*Queue]*resources.Resource)
+                       getChildQueuesPreemptableResource(tc.parentQueue, 
tc.parentPreemptable, childQueues)
+                       assert.Equal(t, len(childQueues), 4)
+                       assert.Equal(t, resources.Equals(childQueues[leaf111], 
tc.leaf111PRes), true)
+                       assert.Equal(t, resources.Equals(childQueues[leaf12], 
tc.leaf12PRes), true)
+                       assert.Equal(t, resources.Equals(childQueues[leaf211], 
tc.leaf211PRes), true)
+                       assert.Equal(t, resources.Equals(childQueues[leaf22], 
tc.leaf22PRes), true)
+                       if _, ok := childQueues[parent.GetChildQueue("leaf3")]; 
ok {
+                               t.Fatal("leaf 3 queue exists")
+                       }
+                       if _, ok := childQueues[parent.GetChildQueue("leaf4")]; 
ok {
+                               t.Fatal("leaf 4 queue exists")
+                       }
+               })
+       }
+}
+
 func TestQuotaChangeTryPreemptionForParentQueue(t *testing.T) {
        node := NewNode(&si.NodeInfo{
                NodeID:     "node",


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to