This is an automated email from the ASF dual-hosted git repository. pbacsko pushed a commit to branch branch-1.8 in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git
commit 70fc74fab1afd1dcd038c768182c1c2a6d153b05 Author: mani <[email protected]> AuthorDate: Sat Dec 13 14:40:21 2025 +0530 [YUNIKORN-3157] Calculate Preemptable Resource for leaf Queue with different resource types (#1055) Closes: #1055 Signed-off-by: mani <[email protected]> (cherry picked from commit 0c96ff1c28039092aa4a4504effa472e36aa8ba8) --- pkg/common/resources/resources.go | 33 +++- pkg/scheduler/objects/quota_change_preemptor.go | 15 +- .../objects/quota_change_preemptor_test.go | 207 ++++++++++++++++++++- 3 files changed, 233 insertions(+), 22 deletions(-) diff --git a/pkg/common/resources/resources.go b/pkg/common/resources/resources.go index ced4bc81..bb3df50a 100644 --- a/pkg/common/resources/resources.go +++ b/pkg/common/resources/resources.go @@ -539,15 +539,29 @@ func getFairShare(allocated, guaranteed, fair *Resource) float64 { return maxShare } -// Get the share of each resource quantity when compared to the total -// resources quantity +// GetShares Get the share of each resource quantity when compared to the total +// resource quantity. Resource type agnostic. Sort shares to order the same based on dominant resource. // NOTE: shares can be negative and positive in the current assumptions func GetShares(res, total *Resource) []float64 { + shares, _ := internalGetShares(res, total) + return shares +} + +// GetSharesTypeWise Get the share of each resource quantity when compared to the total +// resources quantity +// NOTE: shares can be negative and positive in the current assumptions +func GetSharesTypeWise(res, total *Resource) map[string]float64 { + _, sharesTypeWise := internalGetShares(res, total) + return sharesTypeWise +} + +func internalGetShares(res, total *Resource) ([]float64, map[string]float64) { // shortcut if the passed in resource to get the share on is nil or empty (sparse) if res == nil || len(res.Resources) == 0 { - return make([]float64, 0) + return make([]float64, 0), make(map[string]float64) } shares := make([]float64, len(res.Resources)) + sharesTypeWise := make(map[string]float64, len(res.Resources)) idx := 0 for k, v := range res.Resources { // no usage then there is no share (skip prevents NaN) @@ -566,13 +580,18 @@ func GetShares(res, total *Resource) []float64 { zap.String("resource key", k), zap.Int64("resource quantity", int64(v))) } - shares[idx] = float64(v) + share := float64(v) + shares[idx] = share + sharesTypeWise[k] = share idx++ continue } - shares[idx] = float64(v) / float64(total.Resources[k]) + share := float64(v) / float64(total.Resources[k]) + shares[idx] = share + sharesTypeWise[k] = share + // negative share is logged - if shares[idx] < 0 { + if shares[idx] < 0 || sharesTypeWise[k] < 0 { log.Log(log.Resources).Debug("share set is negative", zap.String("resource key", k), zap.Int64("resource quantity", int64(v)), @@ -583,7 +602,7 @@ func GetShares(res, total *Resource) []float64 { // sort in increasing order, NaN can not be part of the list sort.Float64s(shares) - return shares + return shares, sharesTypeWise } // Calculate share for left of total and right of total. diff --git a/pkg/scheduler/objects/quota_change_preemptor.go b/pkg/scheduler/objects/quota_change_preemptor.go index 3e70d166..8f8a024d 100644 --- a/pkg/scheduler/objects/quota_change_preemptor.go +++ b/pkg/scheduler/objects/quota_change_preemptor.go @@ -51,7 +51,7 @@ func NewQuotaChangePreemptor(queue *Queue) *QuotaChangePreemptionContext { } func (qcp *QuotaChangePreemptionContext) CheckPreconditions() bool { - if !qcp.queue.IsLeafQueue() || !qcp.queue.IsManaged() || qcp.queue.IsQuotaChangePreemptionRunning() { + if !qcp.queue.IsManaged() || qcp.queue.IsQuotaChangePreemptionRunning() { return false } if qcp.maxResource.StrictlyGreaterThanOrEqualsOnlyExisting(qcp.queue.GetAllocatedResource()) { @@ -161,14 +161,13 @@ func getChildQueuesPreemptableResource(queue *Queue, parentPreemptableResource * // Apply percentage on parent's preemptable resource to derive its individual distribution // or share of resources to be preempted. for c, pRes := range childrenPreemptableResource { - i := 0 childPreemptableResource := resources.NewResource() - per := resources.GetShares(pRes, totalPreemptableResource) - for k, v := range parentPreemptableResource.Resources { - // Need to be improved further - value := math.RoundToEven(per[i] * float64(v)) - childPreemptableResource.Resources[k] = resources.Quantity(value) - i++ + per := resources.GetSharesTypeWise(pRes, totalPreemptableResource) + for k := range pRes.Resources { + if _, ok := parentPreemptableResource.Resources[k]; ok { + value := math.RoundToEven(per[k] * float64(parentPreemptableResource.Resources[k])) + childPreemptableResource.Resources[k] = resources.Quantity(value) + } } if c.IsLeafQueue() { childQueues[c] = childPreemptableResource diff --git a/pkg/scheduler/objects/quota_change_preemptor_test.go b/pkg/scheduler/objects/quota_change_preemptor_test.go index 10ab82ab..267e8bbd 100644 --- a/pkg/scheduler/objects/quota_change_preemptor_test.go +++ b/pkg/scheduler/objects/quota_change_preemptor_test.go @@ -40,6 +40,7 @@ func TestQuotaChangeCheckPreconditions(t *testing.T) { } parent, err := NewConfiguredQueue(parentConfig, nil, false, nil) assert.NilError(t, err) + parent.allocatedResource = resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 2000, "cpu": 2000}) leafRes := configs.Resources{ Max: map[string]string{"memory": "1000"}, @@ -69,26 +70,36 @@ func TestQuotaChangeCheckPreconditions(t *testing.T) { Resources: leafRes, }, parent, false, nil) assert.NilError(t, err) - usageExceededMaxQueue.allocatedResource = resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 2000}) + usageExceededMaxQueue.allocatedResource = resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 2000, "cpu": 2000}) usageEqualsMaxQueue, err := NewConfiguredQueue(configs.QueueConfig{ Name: "leaf-usage-equals-max", Resources: leafRes, }, parent, false, nil) assert.NilError(t, err) - usageEqualsMaxQueue.allocatedResource = resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 1000}) + usageEqualsMaxQueue.allocatedResource = resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 1000, "cpu": 1000}) + + usageNotMatchingMaxQueue, err := NewConfiguredQueue(configs.QueueConfig{ + Name: "leaf-usage-res-not-matching-max-res", + Resources: configs.Resources{ + Max: map[string]string{"cpu": "1000"}, + }, + }, parent, false, nil) + assert.NilError(t, err) + usageNotMatchingMaxQueue.allocatedResource = resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 1000}) testCases := []struct { name string queue *Queue preconditionResult bool }{ - {"parent queue", parent, false}, + {"parent queue", parent, true}, {"leaf queue", leaf, false}, {"dynamic leaf queue", dynamicLeaf, false}, {"leaf queue, already preemption process started or running", alreadyPreemptionRunning, false}, {"leaf queue, usage exceeded max resources", usageExceededMaxQueue, true}, {"leaf queue, usage equals max resources", usageEqualsMaxQueue, false}, + {"leaf queue, usage res not matching max resources", usageNotMatchingMaxQueue, false}, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { @@ -116,8 +127,7 @@ func TestQuotaChangeGetPreemptableResource(t *testing.T) { {"nil used", leaf, resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 1000}), nil, nil}, {"used below max", leaf, resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 1000}), resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 500}), nil}, {"used above max", leaf, resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 1000}), resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 1500}), resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 500})}, - {"used above max in specific res type", leaf, resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 1000, "cpu": 10}), resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 1500}), resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 500})}, - {"used above max and below max in specific res type", leaf, resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 1000, "cpu": 10}), resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 1500, "cpu": 10}), resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 500})}, + {"used above max, below max, equals max in specific res type and also with extra res types", leaf, resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 1000, "cpu": 10, "gpu": 10}), resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 1500, "cpu": 10, "gpu": 9}), resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 500})}, {"used res type but max undefined", leaf, resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 1000}), resources.NewResourceFromMap(map[string]resources.Quantity{"cpu": 150}), nil}, } for _, tc := range testCases { @@ -267,7 +277,119 @@ func TestQuotaChangeTryPreemption(t *testing.T) { } } -// TestQuotaChangeGetLeafQueuesPreemptableResource Test leaf queues distribution from parent's preemptable resources under different circumstances +func TestQuotaChangeTryPreemptionWithDifferentResTypes(t *testing.T) { + leaf, err := NewConfiguredQueue(configs.QueueConfig{ + Name: "leaf", + }, nil, false, nil) + assert.NilError(t, err) + + node := NewNode(&si.NodeInfo{ + NodeID: "node", + Attributes: nil, + SchedulableResource: &si.Resource{ + Resources: map[string]*si.Quantity{"first": {Value: 100}, "second": {Value: 200}}, + }, + }) + + suitableVictims := make([]*Allocation, 0) + overflowVictims := make([]*Allocation, 0) + oversizedVictims := make([]*Allocation, 0) + + suitableVictims = append(suitableVictims, createVictim(t, "ask1", node, 5, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10, "second": 10}))) + suitableVictims = append(suitableVictims, createVictim(t, "ask2", node, 4, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10, "second": 10}))) + + oversizedVictims = append(oversizedVictims, createVictim(t, "ask21", node, 4, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 9, "second": 10}))) + oversizedVictims = append(oversizedVictims, createVictim(t, "ask3", node, 3, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 11, "second": 10}))) + + overflowVictims = append(overflowVictims, createVictim(t, "ask4", node, 3, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5, "second": 10}))) + overflowVictims = append(overflowVictims, createVictim(t, "ask41", node, 2, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 6, "second": 10}))) + overflowVictims = append(overflowVictims, createVictim(t, "ask42", node, 1, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 9, "second": 10}))) + + oldMax := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 20}) + newMax := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}) + newMaxWithNewResTypes := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10, "third": 10}) + newMaxWithRemovedResTypes := resources.NewResourceFromMap(map[string]resources.Quantity{"second": 10}) + lowerGuaranteed := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}) + lowerGuaranteedWithNewResTypes := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5, "fourth": 5}) + + type test struct { + allocs []*Allocation + totalExpectedVictims int + expectedVictimsCount int + } + + testCases := []struct { + name string + queue *Queue + oldMax *resources.Resource + newMax *resources.Resource + guaranteed *resources.Resource + victims []test + }{ + {"oversized victims available with extra resource types", leaf, oldMax, newMax, nil, + []test{ + {oversizedVictims, 2, 1}, + }, + }, + {"suitable victims available with extra resource types other than defined in max", leaf, oldMax, newMax, nil, + []test{ + {suitableVictims, 2, 1}, + }, + }, + {"suitable victims available with extra resource types other than defined in max", leaf, nil, newMax, nil, + []test{ + {suitableVictims, 2, 1}, + }, + }, + {"suitable victims available with extra resource types other than defined in guaranteed", leaf, nil, newMax, lowerGuaranteed, + []test{ + {suitableVictims, 2, 1}, + }, + }, + {"suitable victims available - different res types, adding new res type in max", leaf, oldMax, newMaxWithNewResTypes, nil, + []test{ + {suitableVictims, 2, 1}, + }, + }, + {"suitable victims available - different res types, removing existing res type from max", leaf, oldMax, newMaxWithRemovedResTypes, nil, + []test{ + {suitableVictims, 2, 1}, + }, + }, + {"overflow victims available with extra resource types other than defined in guaranteed and vice versa", leaf, oldMax, newMax, lowerGuaranteedWithNewResTypes, + []test{ + {overflowVictims, 3, 2}, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + for _, v := range tc.victims { + leaf.maxResource = tc.oldMax + leaf.guaranteedResource = tc.guaranteed + asks := v.allocs + assignAllocationsToQueue(asks, leaf) + leaf.maxResource = tc.newMax + leaf.guaranteedResource = tc.guaranteed + preemptor := NewQuotaChangePreemptor(tc.queue) + preemptor.tryPreemption() + assert.Equal(t, len(preemptor.getVictims()), v.totalExpectedVictims) + var victimsCount int + for _, a := range asks { + if a.IsPreempted() { + victimsCount++ + } + } + assert.Equal(t, victimsCount, v.expectedVictimsCount) + removeAllocationAsks(node, asks) + resetQueue(leaf) + } + }) + } +} + +// TestQuotaChangeGetChildQueuesPreemptableResource Test child queues distribution from parent's preemptable resources under different circumstances // Queue Structure: // parent // @@ -281,7 +403,7 @@ func TestQuotaChangeTryPreemption(t *testing.T) { // leaf22 // leaf3 (No usage) // leaf4 (Guaranteed set but equals usage) -func TestQuotaChangeGetLeafQueuesPreemptableResource(t *testing.T) { +func TestQuotaChangeGetChildQueuesPreemptableResource(t *testing.T) { parentConfig := configs.QueueConfig{Name: "parent", Parent: true} parent, err := NewConfiguredQueue(parentConfig, nil, false, nil) assert.NilError(t, err) @@ -338,6 +460,77 @@ func TestQuotaChangeGetLeafQueuesPreemptableResource(t *testing.T) { } } +// TestQuotaChangeGetChildQueuesPreemptableResourceWithDifferentResTypes Test child queues distribution from parent's preemptable resources under different circumstances with different resource types +// Queue Structure: +// parent +// +// leaf 1 (Guaranteed set for this hierarchy) +// leaf11 +// leaf111 +// leaf12 +// leaf2 (Guaranteed not set for this hierarchy) +// leaf21 +// leaf211 +// leaf22 +// leaf3 (No usage) +// leaf4 (Guaranteed set but equals usage) +func TestQuotaChangeGetChildQueuesPreemptableResourceWithDifferentResTypes(t *testing.T) { + parentConfig := configs.QueueConfig{Name: "parent", Parent: true} + parent, err := NewConfiguredQueue(parentConfig, nil, false, nil) + assert.NilError(t, err) + + leaf111, leaf12, leaf211, leaf22, leaf4 := createQueueSetups(t, parent, configs.Resources{Guaranteed: map[string]string{"first": "10", "third": "10"}}, configs.Resources{}) + + parent.GetChildQueue("leaf1").allocatedResource = resources.NewResourceFromMap(map[string]resources.Quantity{"first": 50, "second": 50}) + parent.GetChildQueue("leaf2").allocatedResource = resources.NewResourceFromMap(map[string]resources.Quantity{"first": 80}) + leaf4.allocatedResource = resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10, "second": 10}) + parent.GetChildQueue("leaf1").GetChildQueue("leaf11").allocatedResource = resources.NewResourceFromMap(map[string]resources.Quantity{"first": 30, "second": 30}) + parent.GetChildQueue("leaf1").GetChildQueue("leaf12").allocatedResource = resources.NewResourceFromMap(map[string]resources.Quantity{"first": 20, "second": 20}) + parent.GetChildQueue("leaf2").GetChildQueue("leaf21").allocatedResource = resources.NewResourceFromMap(map[string]resources.Quantity{"first": 50}) + leaf22.allocatedResource = resources.NewResourceFromMap(map[string]resources.Quantity{"first": 30}) + leaf111.allocatedResource = resources.NewResourceFromMap(map[string]resources.Quantity{"first": 30, "second": 30}) + leaf211.allocatedResource = resources.NewResourceFromMap(map[string]resources.Quantity{"first": 50}) + + testCases := []struct { + name string + parentQueue *Queue + parentPreemptable *resources.Resource + leaf111PRes *resources.Resource + leaf12PRes *resources.Resource + leaf211PRes *resources.Resource + leaf22PRes *resources.Resource + }{ + {"normal preemptable resources - normal distribution", parent, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 100, "fourth": 100}), + resources.NewResourceFromMap(map[string]resources.Quantity{"first": 22}), resources.NewResourceFromMap(map[string]resources.Quantity{"first": 11}), + resources.NewResourceFromMap(map[string]resources.Quantity{"first": 42}), resources.NewResourceFromMap(map[string]resources.Quantity{"first": 25})}, + + {"twice the preemptable resources - twice the normal distribution", parent, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 200, "fourth": 100}), + resources.NewResourceFromMap(map[string]resources.Quantity{"first": 45}), resources.NewResourceFromMap(map[string]resources.Quantity{"first": 22}), + resources.NewResourceFromMap(map[string]resources.Quantity{"first": 83}), resources.NewResourceFromMap(map[string]resources.Quantity{"first": 50})}, + + {"half the preemptable resources - half the normal distribution", parent, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 50, "fourth": 100}), + resources.NewResourceFromMap(map[string]resources.Quantity{"first": 11}), resources.NewResourceFromMap(map[string]resources.Quantity{"first": 6}), + resources.NewResourceFromMap(map[string]resources.Quantity{"first": 21}), resources.NewResourceFromMap(map[string]resources.Quantity{"first": 12})}, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + childQueues := make(map[*Queue]*resources.Resource) + getChildQueuesPreemptableResource(tc.parentQueue, tc.parentPreemptable, childQueues) + assert.Equal(t, len(childQueues), 4) + assert.Equal(t, resources.Equals(childQueues[leaf111], tc.leaf111PRes), true) + assert.Equal(t, resources.Equals(childQueues[leaf12], tc.leaf12PRes), true) + assert.Equal(t, resources.Equals(childQueues[leaf211], tc.leaf211PRes), true) + assert.Equal(t, resources.Equals(childQueues[leaf22], tc.leaf22PRes), true) + if _, ok := childQueues[parent.GetChildQueue("leaf3")]; ok { + t.Fatal("leaf 3 queue exists") + } + if _, ok := childQueues[parent.GetChildQueue("leaf4")]; ok { + t.Fatal("leaf 4 queue exists") + } + }) + } +} + func TestQuotaChangeTryPreemptionForParentQueue(t *testing.T) { node := NewNode(&si.NodeInfo{ NodeID: "node", --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
