This is an automated email from the ASF dual-hosted git repository.
mani pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git
The following commit(s) were added to refs/heads/master by this push:
new cefba883 [YUNIKORN-3155] Leaf Queue Selection process when parent
queue quota lowers (#1051)
cefba883 is described below
commit cefba883ac89a2e228a82daa65b8adfc920ff172
Author: mani <[email protected]>
AuthorDate: Thu Nov 27 18:38:48 2025 +0530
[YUNIKORN-3155] Leaf Queue Selection process when parent queue quota lowers
(#1051)
Closes: #1051
Signed-off-by: mani <[email protected]>
---
pkg/common/resources/resources.go | 10 +-
pkg/common/resources/resources_test.go | 2 +-
pkg/log/logger.go | 62 +++---
pkg/scheduler/objects/quota_change_preemptor.go | 123 +++++++++--
.../objects/quota_change_preemptor_test.go | 236 +++++++++++++++++++++
5 files changed, 384 insertions(+), 49 deletions(-)
diff --git a/pkg/common/resources/resources.go
b/pkg/common/resources/resources.go
index 3c51f061..ced4bc81 100644
--- a/pkg/common/resources/resources.go
+++ b/pkg/common/resources/resources.go
@@ -542,7 +542,7 @@ func getFairShare(allocated, guaranteed, fair *Resource)
float64 {
// Get the share of each resource quantity when compared to the total
// resources quantity
// NOTE: shares can be negative and positive in the current assumptions
-func getShares(res, total *Resource) []float64 {
+func GetShares(res, total *Resource) []float64 {
// shortcut if the passed in resource to get the share on is nil or
empty (sparse)
if res == nil || len(res.Resources) == 0 {
return make([]float64, 0)
@@ -592,8 +592,8 @@ func getShares(res, total *Resource) []float64 {
// 1 if the left share is larger
// -1 if the right share is larger
func CompUsageRatio(left, right, total *Resource) int {
- lshares := getShares(left, total)
- rshares := getShares(right, total)
+ lshares := GetShares(left, total)
+ rshares := GetShares(right, total)
return compareShares(lshares, rshares)
}
@@ -622,8 +622,8 @@ func CompUsageRatioSeparately(leftAllocated,
leftGuaranteed, leftFairMax, rightA
// highest share for right resource from total.
// If highest share for the right resource is 0 fairness is 1
func FairnessRatio(left, right, total *Resource) float64 {
- lshares := getShares(left, total)
- rshares := getShares(right, total)
+ lshares := GetShares(left, total)
+ rshares := GetShares(right, total)
// Get the largest value from the shares
lshare := float64(0)
diff --git a/pkg/common/resources/resources_test.go
b/pkg/common/resources/resources_test.go
index a5f20793..4375a076 100644
--- a/pkg/common/resources/resources_test.go
+++ b/pkg/common/resources/resources_test.go
@@ -1614,7 +1614,7 @@ func TestGetShares(t *testing.T) {
for _, tc := range tests {
t.Run(tc.message, func(t *testing.T) {
- shares := getShares(tc.res, tc.total)
+ shares := GetShares(tc.res, tc.total)
if !reflect.DeepEqual(shares, tc.expected) {
t.Errorf("incorrect shares for %s, expected %v
got: %v", tc.message, tc.expected, shares)
}
diff --git a/pkg/log/logger.go b/pkg/log/logger.go
index 6ef29fe9..bbfe3ba3 100644
--- a/pkg/log/logger.go
+++ b/pkg/log/logger.go
@@ -54,43 +54,43 @@ const (
// Defined loggers: when adding new loggers, ids must be sequential, and all
must be added to the loggers slice in the same order
var (
- Core = &LoggerHandle{id: 0, name: "core"}
- Test = &LoggerHandle{id: 1, name: "test"}
- Deprecation = &LoggerHandle{id: 2, name: "deprecation"}
- Config = &LoggerHandle{id: 3, name: "core.config"}
- Entrypoint = &LoggerHandle{id: 4, name:
"core.entrypoint"}
- Events = &LoggerHandle{id: 5, name: "core.events"}
- OpenTracing = &LoggerHandle{id: 6, name:
"core.opentracing"}
- Resources = &LoggerHandle{id: 7, name: "core.resources"}
- REST = &LoggerHandle{id: 8, name: "core.rest"}
- RMProxy = &LoggerHandle{id: 9, name: "core.rmproxy"}
- RPC = &LoggerHandle{id: 10, name: "core.rpc"}
- Metrics = &LoggerHandle{id: 11, name: "core.metrics"}
- Scheduler = &LoggerHandle{id: 12, name:
"core.scheduler"}
- SchedAllocation = &LoggerHandle{id: 13, name:
"core.scheduler.allocation"}
- SchedApplication = &LoggerHandle{id: 14, name:
"core.scheduler.application"}
- SchedAppUsage = &LoggerHandle{id: 15, name:
"core.scheduler.application.usage"}
- SchedContext = &LoggerHandle{id: 16, name:
"core.scheduler.context"}
- SchedFSM = &LoggerHandle{id: 17, name:
"core.scheduler.fsm"}
- SchedHealth = &LoggerHandle{id: 18, name:
"core.scheduler.health"}
- SchedNode = &LoggerHandle{id: 19, name:
"core.scheduler.node"}
- SchedPartition = &LoggerHandle{id: 20, name:
"core.scheduler.partition"}
- SchedPreemption = &LoggerHandle{id: 21, name:
"core.scheduler.preemption"}
- SchedQueue = &LoggerHandle{id: 22, name:
"core.scheduler.queue"}
- SchedReservation = &LoggerHandle{id: 23, name:
"core.scheduler.reservation"}
- SchedUGM = &LoggerHandle{id: 24, name:
"core.scheduler.ugm"}
- SchedNodesUsage = &LoggerHandle{id: 25, name:
"core.scheduler.nodesusage"}
- Security = &LoggerHandle{id: 26, name: "core.security"}
- Utils = &LoggerHandle{id: 27, name: "core.utils"}
- Diagnostics = &LoggerHandle{id: 28, name:
"core.diagnostics"}
- ShedQuotaChangePreemption = &LoggerHandle{id: 29, name:
"core.scheduler.preemption.quotachange"}
+ Core = &LoggerHandle{id: 0, name: "core"}
+ Test = &LoggerHandle{id: 1, name: "test"}
+ Deprecation = &LoggerHandle{id: 2, name: "deprecation"}
+ Config = &LoggerHandle{id: 3, name: "core.config"}
+ Entrypoint = &LoggerHandle{id: 4, name:
"core.entrypoint"}
+ Events = &LoggerHandle{id: 5, name: "core.events"}
+ OpenTracing = &LoggerHandle{id: 6, name:
"core.opentracing"}
+ Resources = &LoggerHandle{id: 7, name:
"core.resources"}
+ REST = &LoggerHandle{id: 8, name: "core.rest"}
+ RMProxy = &LoggerHandle{id: 9, name: "core.rmproxy"}
+ RPC = &LoggerHandle{id: 10, name: "core.rpc"}
+ Metrics = &LoggerHandle{id: 11, name: "core.metrics"}
+ Scheduler = &LoggerHandle{id: 12, name:
"core.scheduler"}
+ SchedAllocation = &LoggerHandle{id: 13, name:
"core.scheduler.allocation"}
+ SchedApplication = &LoggerHandle{id: 14, name:
"core.scheduler.application"}
+ SchedAppUsage = &LoggerHandle{id: 15, name:
"core.scheduler.application.usage"}
+ SchedContext = &LoggerHandle{id: 16, name:
"core.scheduler.context"}
+ SchedFSM = &LoggerHandle{id: 17, name:
"core.scheduler.fsm"}
+ SchedHealth = &LoggerHandle{id: 18, name:
"core.scheduler.health"}
+ SchedNode = &LoggerHandle{id: 19, name:
"core.scheduler.node"}
+ SchedPartition = &LoggerHandle{id: 20, name:
"core.scheduler.partition"}
+ SchedPreemption = &LoggerHandle{id: 21, name:
"core.scheduler.preemption"}
+ SchedQueue = &LoggerHandle{id: 22, name:
"core.scheduler.queue"}
+ SchedReservation = &LoggerHandle{id: 23, name:
"core.scheduler.reservation"}
+ SchedUGM = &LoggerHandle{id: 24, name:
"core.scheduler.ugm"}
+ SchedNodesUsage = &LoggerHandle{id: 25, name:
"core.scheduler.nodesusage"}
+ Security = &LoggerHandle{id: 26, name:
"core.security"}
+ Utils = &LoggerHandle{id: 27, name: "core.utils"}
+ Diagnostics = &LoggerHandle{id: 28, name:
"core.diagnostics"}
+ SchedQuotaChangePreemption = &LoggerHandle{id: 29, name:
"core.scheduler.preemption.quotachange"}
)
// this tracks all the known logger handles, used to preallocate the real
logger instances when configuration changes
var loggers = []*LoggerHandle{
Core, Test, Deprecation, Config, Entrypoint, Events, OpenTracing,
Resources, REST, RMProxy, RPC, Metrics,
Scheduler, SchedAllocation, SchedApplication, SchedAppUsage,
SchedContext, SchedFSM, SchedHealth, SchedNode,
- SchedPartition, SchedPreemption, SchedQueue, SchedReservation,
SchedUGM, SchedNodesUsage, Security, Utils, Diagnostics,
ShedQuotaChangePreemption,
+ SchedPartition, SchedPreemption, SchedQueue, SchedReservation,
SchedUGM, SchedNodesUsage, Security, Utils, Diagnostics,
SchedQuotaChangePreemption,
}
// structure to hold all current logger configuration state
diff --git a/pkg/scheduler/objects/quota_change_preemptor.go
b/pkg/scheduler/objects/quota_change_preemptor.go
index 018247ed..3e70d166 100644
--- a/pkg/scheduler/objects/quota_change_preemptor.go
+++ b/pkg/scheduler/objects/quota_change_preemptor.go
@@ -54,18 +54,47 @@ func (qcp *QuotaChangePreemptionContext)
CheckPreconditions() bool {
if !qcp.queue.IsLeafQueue() || !qcp.queue.IsManaged() ||
qcp.queue.IsQuotaChangePreemptionRunning() {
return false
}
- if
qcp.maxResource.StrictlyGreaterThanOnlyExisting(qcp.queue.GetAllocatedResource())
{
+ if
qcp.maxResource.StrictlyGreaterThanOrEqualsOnlyExisting(qcp.queue.GetAllocatedResource())
{
return false
}
return true
}
func (qcp *QuotaChangePreemptionContext) tryPreemption() {
+ // Get Preemptable Resource
+ preemptableResource := qcp.getPreemptableResources()
+
+ if !qcp.queue.IsLeafQueue() {
+ leafQueues := make(map[*Queue]*resources.Resource)
+ getChildQueuesPreemptableResource(qcp.queue,
preemptableResource, leafQueues)
+
+ log.Log(log.SchedQuotaChangePreemption).Info("Triggering quota
change preemption for parent queue",
+ zap.String("parent queue", qcp.queue.GetQueuePath()),
+ zap.String("preemptable resource",
preemptableResource.String()),
+ zap.Any("no. of leaf queues with potential victims",
len(leafQueues)),
+ )
+
+ for leaf, leafPreemptableResource := range leafQueues {
+ leafQueueQCPC := NewQuotaChangePreemptor(leaf)
+
log.Log(log.SchedQuotaChangePreemption).Info("Triggering quota change
preemption for leaf queue",
+ zap.String("leaf queue", leaf.GetQueuePath()),
+ zap.String("max resource",
leafQueueQCPC.maxResource.String()),
+ zap.String("guaranteed resource",
leafQueueQCPC.guaranteedResource.String()),
+ zap.String("actual allocated resource",
leafQueueQCPC.allocatedResource.String()),
+ zap.String("preemptable resource distribution",
leafPreemptableResource.String()),
+ )
+
leafQueueQCPC.tryPreemptionInternal(leafPreemptableResource)
+ }
+ } else {
+ qcp.tryPreemptionInternal(preemptableResource)
+ }
+}
+
+func (qcp *QuotaChangePreemptionContext)
tryPreemptionInternal(preemptableResource *resources.Resource) {
// quota change preemption has started, so mark the flag
qcp.queue.MarkQuotaChangePreemptionRunning(true)
- // Get Preemptable Resource
- qcp.preemptableResource = qcp.getPreemptableResources()
+ qcp.preemptableResource = preemptableResource
// Filter the allocations
qcp.allocations = qcp.filterAllocations()
@@ -83,6 +112,72 @@ func (qcp *QuotaChangePreemptionContext) tryPreemption() {
qcp.queue.resetPreemptionSettings()
}
+// getChildQueuesPreemptableResource Compute leaf queue's preemptable resource
distribution from the parent's preemptable resource.
+// Start with immediate children of parent, compute each child distribution
from its parent preemptable resource and repeat the same
+// for all children at all levels until end leaf queues processed recursively.
+
+// In order to achieve a fair distribution of parent's preemptable resource
among its children,
+// Higher (relatively) the usage is, higher the preemptable resource would be
resulted in.
+// Usage above guaranteed (if set) is only considered to derive the
preemptable resource.
+func getChildQueuesPreemptableResource(queue *Queue, parentPreemptableResource
*resources.Resource, childQueues map[*Queue]*resources.Resource) {
+ children := queue.GetCopyOfChildren()
+ if len(children) == 0 {
+ return
+ }
+
+ // Sum of all children preemptable resources
+ totalPreemptableResource := resources.NewResource()
+
+ // Preemptable resource of all children
+ childrenPreemptableResource := make(map[*Queue]*resources.Resource)
+
+ // Traverse each child and calculate its own preemptable resource.
Preemptable resource is the amount of resources used above than the guaranteed
set.
+ // In case guaranteed not set, entire used resources is treated as
preemptable resource.
+ // Total preemptable resource (sum of all children's preemptable
resources) would be calculated along the way.
+ for _, child := range children {
+ // Skip child if there is no usage or usage below or equals
guaranteed
+ if child.GetAllocatedResource().IsEmpty() ||
child.GetGuaranteedResource().StrictlyGreaterThanOrEqualsOnlyExisting(child.GetAllocatedResource())
{
+ continue
+ }
+ var usedResource *resources.Resource
+ if !child.GetGuaranteedResource().IsEmpty() {
+ usedResource =
resources.SubOnlyExisting(child.GetGuaranteedResource(),
child.GetAllocatedResource())
+ } else {
+ usedResource = child.GetAllocatedResource()
+ }
+ preemptableResource := resources.NewResource()
+ for k, v := range usedResource.Resources {
+ if v < 0 {
+ preemptableResource.Resources[k] = v * -1
+ } else {
+ preemptableResource.Resources[k] = v
+ }
+ }
+ childrenPreemptableResource[child] = preemptableResource
+ totalPreemptableResource.AddTo(preemptableResource)
+ }
+
+ // Second pass: Traverse each child and calculate percentage of each
resource type based on total preemptable resource.
+ // Apply percentage on parent's preemptable resource to derive its
individual distribution
+ // or share of resources to be preempted.
+ for c, pRes := range childrenPreemptableResource {
+ i := 0
+ childPreemptableResource := resources.NewResource()
+ per := resources.GetShares(pRes, totalPreemptableResource)
+ for k, v := range parentPreemptableResource.Resources {
+ // Need to be improved further
+ value := math.RoundToEven(per[i] * float64(v))
+ childPreemptableResource.Resources[k] =
resources.Quantity(value)
+ i++
+ }
+ if c.IsLeafQueue() {
+ childQueues[c] = childPreemptableResource
+ } else {
+ getChildQueuesPreemptableResource(c,
childPreemptableResource, childQueues)
+ }
+ }
+}
+
// getPreemptableResources Get the preemptable resources for the queue
// Subtracting the usage from the max resource gives the preemptable resources.
// It could contain both positive and negative values. Only negative values
are preemptable.
@@ -140,7 +235,7 @@ func (qcp *QuotaChangePreemptionContext)
filterAllocations() []*Allocation {
allocations = append(allocations, alloc)
}
}
- log.Log(log.ShedQuotaChangePreemption).Info("Filtering allocations",
+ log.Log(log.SchedQuotaChangePreemption).Info("Filtering allocations",
zap.String("queue", qcp.queue.GetQueuePath()),
zap.Int("filtered allocations", len(allocations)),
)
@@ -160,28 +255,32 @@ func (qcp *QuotaChangePreemptionContext)
sortAllocations() {
// Otherwise, exceeding above the required resources slightly is acceptable
for now.
func (qcp *QuotaChangePreemptionContext) preemptVictims() {
if len(qcp.allocations) == 0 {
+ log.Log(log.SchedQuotaChangePreemption).Warn("BUG: No victims
to enforce quota change through preemption",
+ zap.String("queue", qcp.queue.GetQueuePath()))
return
}
- log.Log(log.ShedQuotaChangePreemption).Info("Found victims for quota
change preemption",
- zap.String("queue", qcp.queue.GetQueuePath()),
- zap.Int("total victims", len(qcp.allocations)))
apps := make(map[*Application][]*Allocation)
victimsTotalResource := resources.NewResource()
- isGuaranteedAndMaxEquals := qcp.maxResource != nil &&
qcp.guaranteedResource != nil && resources.Equals(qcp.maxResource,
qcp.guaranteedResource)
- log.Log(log.ShedQuotaChangePreemption).Info("Found victims for quota
change preemption",
+ log.Log(log.SchedQuotaChangePreemption).Info("Found victims for quota
change preemption",
zap.String("queue", qcp.queue.GetQueuePath()),
zap.Int("total victims", len(qcp.allocations)),
zap.String("max resources", qcp.maxResource.String()),
zap.String("guaranteed resources",
qcp.guaranteedResource.String()),
zap.String("allocated resources",
qcp.allocatedResource.String()),
zap.String("preemptable resources",
qcp.preemptableResource.String()),
- zap.Bool("isGuaranteedSet", isGuaranteedAndMaxEquals),
+ zap.Bool("isGuaranteedSet", qcp.guaranteedResource.IsEmpty()),
)
for _, victim := range qcp.allocations {
if
!qcp.preemptableResource.FitInMaxUndef(victim.GetAllocatedResource()) {
continue
}
application := qcp.queue.GetApplication(victim.applicationID)
+ if application == nil {
+ log.Log(log.SchedQuotaChangePreemption).Warn("BUG:
application not found in queue",
+ zap.String("queue", qcp.queue.GetQueuePath()),
+ zap.String("application", victim.applicationID))
+ continue
+ }
// Keep collecting the victims until preemptable resource
reaches and subtract the usage
if
qcp.preemptableResource.StrictlyGreaterThanOnlyExisting(victimsTotalResource) {
@@ -191,7 +290,7 @@ func (qcp *QuotaChangePreemptionContext) preemptVictims() {
// Has usage gone below the guaranteed resources?
// If yes, revert the recently added victim steps completely
and try next victim.
- if isGuaranteedAndMaxEquals &&
!qcp.allocatedResource.StrictlyGreaterThanOnlyExisting(qcp.guaranteedResource) {
+ if !qcp.guaranteedResource.IsEmpty() &&
qcp.guaranteedResource.StrictlyGreaterThanOnlyExisting(qcp.allocatedResource) {
victims := apps[application]
exceptRecentlyAddedVictims := victims[:len(victims)-1]
apps[application] = exceptRecentlyAddedVictims
@@ -205,7 +304,7 @@ func (qcp *QuotaChangePreemptionContext) preemptVictims() {
for app, victims := range apps {
if len(victims) > 0 {
for _, victim := range victims {
-
log.Log(log.ShedQuotaChangePreemption).Info("Preempting victims for quota
change preemption",
+
log.Log(log.SchedQuotaChangePreemption).Info("Preempting victims for quota
change preemption",
zap.String("queue",
qcp.queue.GetQueuePath()),
zap.String("victim allocation key",
victim.allocationKey),
zap.String("victim allocated
resources", victim.GetAllocatedResource().String()),
diff --git a/pkg/scheduler/objects/quota_change_preemptor_test.go
b/pkg/scheduler/objects/quota_change_preemptor_test.go
index ec2f3e81..10ab82ab 100644
--- a/pkg/scheduler/objects/quota_change_preemptor_test.go
+++ b/pkg/scheduler/objects/quota_change_preemptor_test.go
@@ -71,6 +71,13 @@ func TestQuotaChangeCheckPreconditions(t *testing.T) {
assert.NilError(t, err)
usageExceededMaxQueue.allocatedResource =
resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 2000})
+ usageEqualsMaxQueue, err := NewConfiguredQueue(configs.QueueConfig{
+ Name: "leaf-usage-equals-max",
+ Resources: leafRes,
+ }, parent, false, nil)
+ assert.NilError(t, err)
+ usageEqualsMaxQueue.allocatedResource =
resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 1000})
+
testCases := []struct {
name string
queue *Queue
@@ -81,6 +88,7 @@ func TestQuotaChangeCheckPreconditions(t *testing.T) {
{"dynamic leaf queue", dynamicLeaf, false},
{"leaf queue, already preemption process started or running",
alreadyPreemptionRunning, false},
{"leaf queue, usage exceeded max resources",
usageExceededMaxQueue, true},
+ {"leaf queue, usage equals max resources", usageEqualsMaxQueue,
false},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
@@ -259,6 +267,234 @@ func TestQuotaChangeTryPreemption(t *testing.T) {
}
}
+// TestQuotaChangeGetLeafQueuesPreemptableResource Test leaf queues
distribution from parent's preemptable resources under different circumstances
+// Queue Structure:
+// parent
+//
+// leaf 1 (Guaranteed set for this hierarchy)
+// leaf11
+// leaf111
+// leaf12
+// leaf2 (Guaranteed not set for this hierarchy)
+// leaf21
+// leaf211
+// leaf22
+// leaf3 (No usage)
+// leaf4 (Guaranteed set but equals usage)
+func TestQuotaChangeGetLeafQueuesPreemptableResource(t *testing.T) {
+ parentConfig := configs.QueueConfig{Name: "parent", Parent: true}
+ parent, err := NewConfiguredQueue(parentConfig, nil, false, nil)
+ assert.NilError(t, err)
+
+ leaf111, leaf12, leaf211, leaf22, leaf4 := createQueueSetups(t, parent,
configs.Resources{Guaranteed: map[string]string{"first": "10"}},
configs.Resources{})
+
+ parent.GetChildQueue("leaf1").allocatedResource =
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 50})
+ parent.GetChildQueue("leaf2").allocatedResource =
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 80})
+ leaf4.allocatedResource =
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10})
+ parent.GetChildQueue("leaf1").GetChildQueue("leaf11").allocatedResource
= resources.NewResourceFromMap(map[string]resources.Quantity{"first": 30})
+ parent.GetChildQueue("leaf1").GetChildQueue("leaf12").allocatedResource
= resources.NewResourceFromMap(map[string]resources.Quantity{"first": 20})
+ parent.GetChildQueue("leaf2").GetChildQueue("leaf21").allocatedResource
= resources.NewResourceFromMap(map[string]resources.Quantity{"first": 50})
+ leaf22.allocatedResource =
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 30})
+ leaf111.allocatedResource =
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 30})
+ leaf211.allocatedResource =
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 50})
+
+ testCases := []struct {
+ name string
+ parentQueue *Queue
+ parentPreemptable *resources.Resource
+ leaf111PRes *resources.Resource
+ leaf12PRes *resources.Resource
+ leaf211PRes *resources.Resource
+ leaf22PRes *resources.Resource
+ }{
+ {"normal preemptable resources - normal distribution", parent,
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 100}),
+
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 22}),
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 11}),
+
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 42}),
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 25})},
+
+ {"twice the preemptable resources - twice the normal
distribution", parent,
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 200}),
+
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 45}),
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 22}),
+
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 83}),
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 50})},
+
+ {"half the preemptable resources - half the normal
distribution", parent,
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 50}),
+
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 11}),
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 6}),
+
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 21}),
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 12})},
+ }
+ for _, tc := range testCases {
+ t.Run(tc.name, func(t *testing.T) {
+ childQueues := make(map[*Queue]*resources.Resource)
+ getChildQueuesPreemptableResource(tc.parentQueue,
tc.parentPreemptable, childQueues)
+ assert.Equal(t, len(childQueues), 4)
+ assert.Equal(t, resources.Equals(childQueues[leaf111],
tc.leaf111PRes), true)
+ assert.Equal(t, resources.Equals(childQueues[leaf12],
tc.leaf12PRes), true)
+ assert.Equal(t, resources.Equals(childQueues[leaf211],
tc.leaf211PRes), true)
+ assert.Equal(t, resources.Equals(childQueues[leaf22],
tc.leaf22PRes), true)
+ if _, ok := childQueues[parent.GetChildQueue("leaf3")];
ok {
+ t.Fatal("leaf 3 queue exists")
+ }
+ if _, ok := childQueues[parent.GetChildQueue("leaf4")];
ok {
+ t.Fatal("leaf 4 queue exists")
+ }
+ })
+ }
+}
+
+func TestQuotaChangeTryPreemptionForParentQueue(t *testing.T) {
+ node := NewNode(&si.NodeInfo{
+ NodeID: "node",
+ Attributes: nil,
+ SchedulableResource: &si.Resource{
+ Resources: map[string]*si.Quantity{"first": {Value:
200}},
+ },
+ })
+
+ parentConfig := configs.QueueConfig{Name: "parent", Parent: true}
+ parent, err := NewConfiguredQueue(parentConfig, nil, false, nil)
+ assert.NilError(t, err)
+
+ parentConfig1 := configs.QueueConfig{Name: "parent1", Parent: true}
+ parent1, err := NewConfiguredQueue(parentConfig1, nil, false, nil)
+ assert.NilError(t, err)
+
+ leaf111G, leaf12G, leaf211G, leaf22G, leaf4G := createQueueSetups(t,
parent, configs.Resources{Guaranteed: map[string]string{"first": "10"}},
configs.Resources{})
+ leaf111, leaf12, leaf211, leaf22, leaf4 := createQueueSetups(t,
parent1, configs.Resources{}, configs.Resources{})
+
+ suitableVictims := make([]*Allocation, 0)
+ suitableVictims = append(suitableVictims, createVictim(t, "ask1", node,
5, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10})))
+ suitableVictims = append(suitableVictims, createVictim(t, "ask2", node,
4, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10})))
+ suitableVictims = append(suitableVictims, createVictim(t, "ask3", node,
4, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10})))
+
+ leafGVictims := make(map[*Queue][]*Allocation)
+ leafGVictims[leaf111G] = suitableVictims
+ leafVictims := make(map[*Queue][]*Allocation)
+ leafVictims[leaf111] = suitableVictims
+
+ suitableVictims1 := make([]*Allocation, 0)
+ suitableVictims1 = append(suitableVictims1, createVictim(t, "ask4",
node, 5, resources.NewResourceFromMap(map[string]resources.Quantity{"first":
10})))
+ suitableVictims1 = append(suitableVictims1, createVictim(t, "ask5",
node, 4, resources.NewResourceFromMap(map[string]resources.Quantity{"first":
10})))
+ leafGVictims[leaf12G] = suitableVictims1
+ leafVictims[leaf12] = suitableVictims1
+
+ suitableVictims2 := make([]*Allocation, 0)
+ suitableVictims2 = append(suitableVictims2, createVictim(t, "ask6",
node, 5, resources.NewResourceFromMap(map[string]resources.Quantity{"first":
10})))
+ suitableVictims2 = append(suitableVictims2, createVictim(t, "ask7",
node, 4, resources.NewResourceFromMap(map[string]resources.Quantity{"first":
10})))
+ suitableVictims2 = append(suitableVictims2, createVictim(t, "ask8",
node, 5, resources.NewResourceFromMap(map[string]resources.Quantity{"first":
10})))
+ suitableVictims2 = append(suitableVictims2, createVictim(t, "ask9",
node, 4, resources.NewResourceFromMap(map[string]resources.Quantity{"first":
10})))
+ suitableVictims2 = append(suitableVictims2, createVictim(t, "ask10",
node, 4, resources.NewResourceFromMap(map[string]resources.Quantity{"first":
10})))
+ leafGVictims[leaf211G] = suitableVictims2
+ leafVictims[leaf211] = suitableVictims2
+
+ suitableVictims3 := make([]*Allocation, 0)
+ suitableVictims3 = append(suitableVictims3, createVictim(t, "ask11",
node, 5, resources.NewResourceFromMap(map[string]resources.Quantity{"first":
10})))
+ suitableVictims3 = append(suitableVictims3, createVictim(t, "ask12",
node, 4, resources.NewResourceFromMap(map[string]resources.Quantity{"first":
10})))
+ suitableVictims3 = append(suitableVictims3, createVictim(t, "ask13",
node, 4, resources.NewResourceFromMap(map[string]resources.Quantity{"first":
10})))
+
+ leafGVictims[leaf22G] = suitableVictims3
+ leafVictims[leaf22] = suitableVictims3
+
+ v := createVictim(t, "ask14", node, 5,
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}))
+ leafGVictims[leaf4G] = []*Allocation{v}
+ leafVictims[leaf4] = []*Allocation{v}
+
+ expectedGVictims := make(map[*Queue]int)
+ expectedGVictims[leaf111G] = 2
+ expectedGVictims[leaf12G] = 1
+ expectedGVictims[leaf211G] = 5
+ expectedGVictims[leaf22G] = 3
+
+ expectedVictims := make(map[*Queue]int)
+ expectedVictims[leaf111] = 3
+ expectedVictims[leaf12] = 2
+ expectedVictims[leaf211] = 5
+ expectedVictims[leaf22] = 3
+
+ oldMax :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 130})
+ newMax :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10})
+
+ testCases := []struct {
+ name string
+ queue *Queue
+ oldMax *resources.Resource
+ newMax *resources.Resource
+ victims map[*Queue][]*Allocation
+ expectedVictims map[*Queue]int
+ }{
+ {"Guaranteed set on one side of queue hierarchy - suitable
victims available", parent, oldMax, newMax, leafGVictims, expectedGVictims},
+ {"Guaranteed set not set on any queue - suitable victims
available", parent1, oldMax, newMax, leafVictims, expectedVictims},
+ }
+ for _, tc := range testCases {
+ t.Run(tc.name, func(t *testing.T) {
+ tc.queue.maxResource = tc.oldMax
+ for q, v := range tc.victims {
+ assignAllocationsToQueue(v, q)
+ }
+ tc.queue.maxResource = tc.newMax
+ preemptor := NewQuotaChangePreemptor(tc.queue)
+ preemptor.tryPreemption()
+ for q, asks := range tc.victims {
+ var victimsCount int
+ for _, a := range asks {
+ if a.IsPreempted() {
+ victimsCount++
+ }
+ }
+ assert.Equal(t, victimsCount,
tc.expectedVictims[q])
+ }
+ for _, v := range tc.victims {
+ removeAllocationAsks(node, v)
+ }
+ resetQueue(tc.queue)
+ })
+ }
+}
+
+// createQueueSetups Creates a queue hierarchy
+// Queue Structure:
+// parent
+//
+// leaf 1 (Guaranteed set/or not set for this hierarchy)
+// leaf11
+// leaf111
+// leaf12
+// leaf2 (Guaranteed not set for this hierarchy)
+// leaf21
+// leaf211
+// leaf22
+// leaf3
+// leaf4
+func createQueueSetups(t *testing.T, parent *Queue, leafResG
configs.Resources, leafRes configs.Resources) (*Queue, *Queue, *Queue, *Queue,
*Queue) {
+ leaf1, err := NewConfiguredQueue(configs.QueueConfig{Name: "leaf1",
Parent: true, Resources: leafResG}, parent, false, nil)
+ assert.NilError(t, err)
+
+ leaf2, err := NewConfiguredQueue(configs.QueueConfig{Name: "leaf2",
Parent: true, Resources: leafRes}, parent, false, nil)
+ assert.NilError(t, err)
+
+ _, err = NewConfiguredQueue(configs.QueueConfig{Name: "leaf3",
Resources: leafRes}, parent, false, nil)
+ assert.NilError(t, err)
+
+ leaf4, err := NewConfiguredQueue(configs.QueueConfig{Name: "leaf4",
Resources: leafResG}, parent, false, nil)
+ assert.NilError(t, err)
+
+ leaf11, err := NewConfiguredQueue(configs.QueueConfig{Name: "leaf11",
Parent: true, Resources: leafResG}, leaf1, false, nil)
+ assert.NilError(t, err)
+
+ leaf12, err := NewConfiguredQueue(configs.QueueConfig{Name: "leaf12",
Resources: leafResG}, leaf1, false, nil)
+ assert.NilError(t, err)
+
+ leaf21, err := NewConfiguredQueue(configs.QueueConfig{Name: "leaf21",
Parent: true, Resources: leafRes}, leaf2, false, nil)
+ assert.NilError(t, err)
+
+ leaf22, err := NewConfiguredQueue(configs.QueueConfig{Name: "leaf22",
Resources: leafRes}, leaf2, false, nil)
+ assert.NilError(t, err)
+
+ leaf111, err := NewConfiguredQueue(configs.QueueConfig{Name: "leaf111",
Resources: leafResG}, leaf11, false, nil)
+ assert.NilError(t, err)
+
+ leaf211, err := NewConfiguredQueue(configs.QueueConfig{Name: "leaf211",
Resources: leafRes}, leaf21, false, nil)
+ assert.NilError(t, err)
+
+ return leaf111, leaf12, leaf211, leaf22, leaf4
+}
+
func createVictim(t *testing.T, allocKey string, node *Node, adjustment int,
allocRes *resources.Resource) *Allocation {
createTime := time.Now()
allocation := createAllocation(allocKey, "app1", node.NodeID, true,
false, 10, false, allocRes)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]