This is an automated email from the ASF dual-hosted git repository.
mani pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git
The following commit(s) were added to refs/heads/master by this push:
new e003b4cf [YUNIKORN-3246] Summarize the preemption results and send
event to the queue (#1079)
e003b4cf is described below
commit e003b4cfc741e649c8fb5c6097a770e2e1e0c9ed
Author: mani <[email protected]>
AuthorDate: Wed Apr 8 11:53:53 2026 +0530
[YUNIKORN-3246] Summarize the preemption results and send event to the
queue (#1079)
Closes: #1079
Signed-off-by: mani <[email protected]>
---
go.mod | 2 +-
go.sum | 2 +
pkg/scheduler/objects/events/queue_events.go | 9 ++
pkg/scheduler/objects/events/queue_events_test.go | 24 +++++
pkg/scheduler/objects/quota_preemptor.go | 51 +++++++++-
pkg/scheduler/objects/quota_preemptor_test.go | 109 ++++++++++++++--------
6 files changed, 154 insertions(+), 43 deletions(-)
diff --git a/go.mod b/go.mod
index 23e2f7c3..8c07eed7 100644
--- a/go.mod
+++ b/go.mod
@@ -22,7 +22,7 @@ module github.com/apache/yunikorn-core
go 1.25.0
require (
- github.com/apache/yunikorn-scheduler-interface
v0.0.0-20260323095809-c1eb5e00dd66
+ github.com/apache/yunikorn-scheduler-interface
v0.0.0-20260408050357-2858f4d09dc7
github.com/go-ldap/ldap/v3 v3.4.13
github.com/google/btree v1.1.3
github.com/google/go-cmp v0.7.0
diff --git a/go.sum b/go.sum
index 2dc223e6..b6751597 100644
--- a/go.sum
+++ b/go.sum
@@ -4,6 +4,8 @@ github.com/alexbrainman/sspi v0.0.0-20250919150558-7d374ff0d59e
h1:4dAU9FXIyQktp
github.com/alexbrainman/sspi v0.0.0-20250919150558-7d374ff0d59e/go.mod
h1:cEWa1LVoE5KvSD9ONXsZrj0z6KqySlCCNKHlLzbqAt4=
github.com/apache/yunikorn-scheduler-interface
v0.0.0-20260323095809-c1eb5e00dd66
h1:6Gab4OugMMyaRc1Q2xCJq2Sk7EFWLu78USx53uXCj2c=
github.com/apache/yunikorn-scheduler-interface
v0.0.0-20260323095809-c1eb5e00dd66/go.mod
h1:RccSRsvQM5CAnKPEhlFhj5BwJHABCgECTnaMEjtX9sM=
+github.com/apache/yunikorn-scheduler-interface
v0.0.0-20260408050357-2858f4d09dc7
h1:Vs2FKxwKjJjUHWWsVq1VPl8JobBwbtDJ/Ti4lfPGF6U=
+github.com/apache/yunikorn-scheduler-interface
v0.0.0-20260408050357-2858f4d09dc7/go.mod
h1:RccSRsvQM5CAnKPEhlFhj5BwJHABCgECTnaMEjtX9sM=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod
h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/cespare/xxhash/v2 v2.3.0
h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
diff --git a/pkg/scheduler/objects/events/queue_events.go
b/pkg/scheduler/objects/events/queue_events.go
index f33e0066..7861ceed 100644
--- a/pkg/scheduler/objects/events/queue_events.go
+++ b/pkg/scheduler/objects/events/queue_events.go
@@ -104,6 +104,15 @@ func (q *QueueEvents) SendTypeChangedEvent(queuePath
string, isLeaf bool) {
q.eventSystem.AddEvent(event)
}
+func (q *QueueEvents) SendQuotaPreemptionEvent(queuePath string, results
string, maxResource *resources.Resource) {
+ if !q.eventSystem.IsEventTrackingEnabled() {
+ return
+ }
+ event := events.CreateQueueEventRecord(queuePath, results,
common.Empty, si.EventRecord_SET,
+ si.EventRecord_QUEUE_PREEMPTION, maxResource)
+ q.eventSystem.AddEvent(event)
+}
+
func NewQueueEvents(evt events.EventSystem) *QueueEvents {
return &QueueEvents{
eventSystem: evt,
diff --git a/pkg/scheduler/objects/events/queue_events_test.go
b/pkg/scheduler/objects/events/queue_events_test.go
index 8e3e6069..85661ce8 100644
--- a/pkg/scheduler/objects/events/queue_events_test.go
+++ b/pkg/scheduler/objects/events/queue_events_test.go
@@ -201,3 +201,27 @@ func TestSendGuaranteedResourceChangedEvent(t *testing.T) {
protoRes := resources.NewResourceFromProto(event.Resource)
assert.DeepEqual(t, guaranteed, protoRes)
}
+
+func TestSendQuotaPreemptionEvent(t *testing.T) {
+ results := "Quota Preemption results summary: preemptable resources: "
+ maxRes :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 1})
+ eventSystem := mock.NewEventSystemDisabled()
+ nq := NewQueueEvents(eventSystem)
+ nq.SendQuotaPreemptionEvent(testQueuePath, results, maxRes)
+ assert.Equal(t, 0, len(eventSystem.Events), "unexpected event")
+
+ eventSystem = mock.NewEventSystem()
+ nq = NewQueueEvents(eventSystem)
+ nq.SendQuotaPreemptionEvent(testQueuePath, results, maxRes)
+ assert.Equal(t, 1, len(eventSystem.Events), "event was not generated")
+ event := eventSystem.Events[0]
+ assert.Equal(t, si.EventRecord_QUEUE, event.Type)
+ assert.Equal(t, testQueuePath, event.ObjectID)
+ assert.Equal(t, common.Empty, event.ReferenceID)
+ assert.Equal(t, results, event.Message)
+ assert.Equal(t, si.EventRecord_SET, event.EventChangeType)
+ assert.Equal(t, si.EventRecord_QUEUE_PREEMPTION,
event.EventChangeDetail)
+ assert.Equal(t, 1, len(event.Resource.Resources))
+ protoRes := resources.NewResourceFromProto(event.Resource)
+ assert.DeepEqual(t, maxRes, protoRes)
+}
diff --git a/pkg/scheduler/objects/quota_preemptor.go
b/pkg/scheduler/objects/quota_preemptor.go
index 1eedf827..bdc326ae 100644
--- a/pkg/scheduler/objects/quota_preemptor.go
+++ b/pkg/scheduler/objects/quota_preemptor.go
@@ -20,6 +20,7 @@ package objects
import (
"math"
+ "strconv"
"go.uber.org/zap"
@@ -36,6 +37,13 @@ type QuotaPreemptionContext struct {
preemptingResource *resources.Resource
preemptableResource *resources.Resource
allocations []*Allocation
+ results *QuotaPreemptionResults
+}
+
+type QuotaPreemptionResults struct {
+ selectedVictims []*Allocation
+ preemptedVictims []*Allocation
+ claimedResource *resources.Resource
}
func NewQuotaPreemptor(queue *Queue) *QuotaPreemptionContext {
@@ -47,6 +55,7 @@ func NewQuotaPreemptor(queue *Queue) *QuotaPreemptionContext {
preemptingResource: queue.GetPreemptingResource(),
preemptableResource: nil,
allocations: make([]*Allocation, 0),
+ results: &QuotaPreemptionResults{},
}
}
@@ -56,6 +65,16 @@ func (qpc *QuotaPreemptionContext) tryPreemption() {
if qpc.queue.IsLeafQueue() {
qpc.tryPreemptionInternal()
+ log.Log(log.SchedQuotaChangePreemption).Info("quota preemption
results for leaf queue",
+ zap.String("queue", qpc.queue.GetQueuePath()),
+ zap.String("preemptable resources planned",
qpc.preemptableResource.String()),
+ zap.String("claimed resources",
qpc.results.claimedResource.String()),
+ zap.Int("selected victims",
len(qpc.results.selectedVictims)),
+ zap.Int("preempted victims",
len(qpc.results.preemptedVictims)))
+ if len(qpc.allocations) > 0 {
+ summary := "Quota Preemption results summary:
preemptable resources: " + qpc.preemptableResource.String() + ", claimed
resources: " + qpc.results.claimedResource.String() + ", selected victims: " +
strconv.Itoa(len(qpc.results.selectedVictims)) + ", preempted victims: " +
strconv.Itoa(len(qpc.results.preemptedVictims))
+
qpc.queue.queueEvents.SendQuotaPreemptionEvent(qpc.queue.QueuePath, summary,
qpc.maxResource)
+ }
return
}
leafQueues := make(map[*Queue]*QuotaPreemptionContext)
@@ -67,8 +86,35 @@ func (qpc *QuotaPreemptionContext) tryPreemption() {
zap.Int("no. of leaf queues with potential victims",
len(leafQueues)),
)
+ totalSelectedVictims := 0
+ totalPreemptedVictims := 0
+ totalClaimedResources := resources.NewResource()
for _, leafContext := range leafQueues {
leafContext.tryPreemptionInternal()
+
+ totalSelectedVictims += len(leafContext.results.selectedVictims)
+ totalPreemptedVictims +=
len(leafContext.results.preemptedVictims)
+ totalClaimedResources.AddTo(leafContext.results.claimedResource)
+ log.Log(log.SchedQuotaChangePreemption).Debug("child queue
quota preemption results",
+ zap.String("parent", qpc.queue.GetQueuePath()),
+ zap.String("parent preemptable resources planned",
qpc.preemptableResource.String()),
+ zap.String("queue", leafContext.queue.GetQueuePath()),
+ zap.String("preemptable resources planned",
leafContext.preemptableResource.String()),
+ zap.String("claimed resources",
leafContext.results.claimedResource.String()),
+ zap.Int("selected victims",
len(leafContext.results.selectedVictims)),
+ zap.Int("preempted victims",
len(leafContext.results.preemptedVictims)))
+ }
+ log.Log(log.SchedQuotaChangePreemption).Info("quota preemption results
for parent queue",
+ zap.String("parent", qpc.queue.GetQueuePath()),
+ zap.String("parent preemptable resources planned",
qpc.preemptableResource.String()),
+ zap.String("preemptable resources planned",
qpc.preemptableResource.String()),
+ zap.String("claimed resources", totalClaimedResources.String()),
+ zap.Int("selected victims", totalSelectedVictims),
+ zap.Int("preempted victims", totalPreemptedVictims))
+
+ if totalSelectedVictims > 0 {
+ summary := "Quota Preemption results summary: preemptable
resources: " + qpc.preemptableResource.String() + ", claimed resources: " +
totalClaimedResources.String() + ", selected victims: " +
strconv.Itoa(totalSelectedVictims) + ", preempted victims: " +
strconv.Itoa(totalPreemptedVictims)
+
qpc.queue.queueEvents.SendQuotaPreemptionEvent(qpc.queue.QueuePath, summary,
qpc.maxResource)
}
}
@@ -267,7 +313,7 @@ func (qpc *QuotaPreemptionContext) preemptVictims() {
}
apps := make(map[*Application][]*Allocation)
victimsTotalResource := resources.NewResource()
- log.Log(log.SchedQuotaChangePreemption).Info("Found victims for quota
change preemption",
+ log.Log(log.SchedQuotaChangePreemption).Debug("Found victims for quota
change preemption",
zap.String("queue", qpc.queue.GetQueuePath()),
zap.Int("total victims", len(qpc.allocations)),
zap.Stringer("max resources", qpc.maxResource),
@@ -276,6 +322,7 @@ func (qpc *QuotaPreemptionContext) preemptVictims() {
zap.Stringer("preemptable resources", qpc.preemptableResource),
zap.Bool("isGuaranteedSet", qpc.guaranteedResource.IsEmpty()),
)
+ qpc.results.selectedVictims = qpc.allocations
for _, victim := range qpc.allocations {
victimAlloc := victim.GetAllocatedResource()
if !qpc.preemptableResource.FitInMaxUndef(victimAlloc) {
@@ -301,6 +348,8 @@ func (qpc *QuotaPreemptionContext) preemptVictims() {
for app, victims := range apps {
if len(victims) > 0 {
+ qpc.results.claimedResource = victimsTotalResource
+ qpc.results.preemptedVictims = victims
for _, victim := range victims {
err := victim.MarkPreempted()
if err != nil {
diff --git a/pkg/scheduler/objects/quota_preemptor_test.go
b/pkg/scheduler/objects/quota_preemptor_test.go
index 8f7584e9..3612b004 100644
--- a/pkg/scheduler/objects/quota_preemptor_test.go
+++ b/pkg/scheduler/objects/quota_preemptor_test.go
@@ -19,6 +19,8 @@
package objects
import (
+ "strconv"
+ "strings"
"testing"
"time"
@@ -26,6 +28,7 @@ import (
"github.com/apache/yunikorn-core/pkg/common/configs"
"github.com/apache/yunikorn-core/pkg/common/resources"
+ "github.com/apache/yunikorn-core/pkg/events"
"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
)
@@ -136,6 +139,9 @@ func TestQuotaChangeFilterVictims(t *testing.T) {
}
func TestQuotaChangeTryPreemption(t *testing.T) {
+ events.Init()
+ eventSystem := events.GetEventSystem().(*events.EventSystemImpl)
//nolint:errcheck
+ eventSystem.StartServiceWithPublisher(false)
leaf, err := NewConfiguredQueue(configs.QueueConfig{
Name: "leaf",
}, nil, false, nil)
@@ -145,11 +151,12 @@ func TestQuotaChangeTryPreemption(t *testing.T) {
NodeID: "node",
Attributes: nil,
SchedulableResource: &si.Resource{
- Resources: map[string]*si.Quantity{"first": {Value:
100}},
+ Resources: map[string]*si.Quantity{"first": {Value:
200}},
},
})
suitableVictims := make([]*Allocation, 0)
+ notSuitableVictims := make([]*Allocation, 0)
oversizedVictims := make([]*Allocation, 0)
overflowVictims := make([]*Allocation, 0)
shortfallVictims := make([]*Allocation, 0)
@@ -169,6 +176,8 @@ func TestQuotaChangeTryPreemption(t *testing.T) {
shortfallVictims = append(shortfallVictims, createVictim(t, "ask52",
node, 2, resources.NewResourceFromMap(map[string]resources.Quantity{"first":
5})))
shortfallVictims = append(shortfallVictims, createVictim(t, "ask53",
node, 1, resources.NewResourceFromMap(map[string]resources.Quantity{"first":
4})))
+ notSuitableVictims = append(notSuitableVictims, createVictim(t, "ask6",
node, 3, resources.NewResourceFromMap(map[string]resources.Quantity{"first":
11})))
+
oldMax :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 20})
newMax :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10})
preemptable := newMax
@@ -182,16 +191,18 @@ func TestQuotaChangeTryPreemption(t *testing.T) {
guaranteed *resources.Resource
preemptableResource *resources.Resource
victims []*Allocation
+ claimedResource *resources.Resource
totalExpectedVictims int
expectedVictimsCount int
}{
- {"no victims available", leaf, oldMax, newMax, nil,
preemptable, []*Allocation{}, 0, 0},
- {"suitable victims available", leaf, oldMax, newMax, nil,
preemptable, suitableVictims, 2, 1},
- {"skip over sized victims", leaf, oldMax, newMax, nil,
preemptable, oversizedVictims, 2, 1},
- {"guaranteed not set", leaf, oldMax, newMax, nil, preemptable,
overflowVictims, 3, 1},
- {"guaranteed set but lower than max", leaf, oldMax, newMax,
lowerGuaranteed, preemptable, overflowVictims, 3, 1},
- {"best effort - guaranteed set and equals max", leaf, oldMax,
newMax, guaranteed, preemptable, shortfallVictims, 4, 2},
- {"best effort - guaranteed set, max not set earlier but now",
leaf, nil, newMax, guaranteed, preemptable, shortfallVictims, 4, 2},
+ {"no victims available", leaf, oldMax, newMax, nil,
preemptable, []*Allocation{}, nil, 0, 0},
+ {"suitable victims available", leaf, oldMax, newMax, nil,
preemptable, suitableVictims, preemptable, 2, 1},
+ {"victims available but none is suitable ", leaf, oldMax,
newMax, nil,
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 1}),
notSuitableVictims, nil, 1, 0},
+ {"skip over sized victims", leaf, oldMax, newMax, nil,
preemptable, oversizedVictims,
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 9}), 2, 1},
+ {"guaranteed not set", leaf, oldMax, newMax, nil, preemptable,
overflowVictims,
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}), 3, 1},
+ {"guaranteed set but lower than max", leaf, oldMax, newMax,
lowerGuaranteed, preemptable, overflowVictims,
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}), 3, 1},
+ {"best effort - guaranteed set and equals max", leaf, oldMax,
newMax, guaranteed, preemptable, shortfallVictims,
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 9}), 4, 2},
+ {"best effort - guaranteed set, max not set earlier but now",
leaf, nil, newMax, guaranteed, preemptable, shortfallVictims,
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 9}), 4, 2},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
@@ -212,8 +223,14 @@ func TestQuotaChangeTryPreemption(t *testing.T) {
}
}
assert.Equal(t, victimsCount, tc.expectedVictimsCount)
+
+ time.Sleep(500 * time.Millisecond)
+ assertQuotaPreemptionEvent(t, tc.totalExpectedVictims,
"Quota Preemption results summary: preemptable resources:
"+tc.preemptableResource.String()+", claimed resources:
"+tc.claimedResource.String()+", selected victims:
"+strconv.Itoa(tc.totalExpectedVictims)+", preempted victims:
"+strconv.Itoa(tc.expectedVictimsCount), eventSystem.Store.CollectEvents())
removeAllocationAsks(node, asks)
resetQueue(leaf)
+
+ // clear the events sent later after earlier collection
+ _ = eventSystem.Store.CollectEvents()
})
}
}
@@ -473,6 +490,9 @@ func
TestQuotaChangeGetChildQueuesPreemptableResourceWithDifferentResTypes(t *te
}
func TestQuotaChangeTryPreemptionForParentQueue(t *testing.T) {
+ events.Init()
+ eventSystem := events.GetEventSystem().(*events.EventSystemImpl)
//nolint:errcheck
+ eventSystem.StartServiceWithPublisher(false)
node := NewNode(&si.NodeInfo{
NodeID: "node",
Attributes: nil,
@@ -480,41 +500,33 @@ func TestQuotaChangeTryPreemptionForParentQueue(t
*testing.T) {
Resources: map[string]*si.Quantity{"first": {Value:
500}},
},
})
-
- parentConfig := configs.QueueConfig{Name: "parent", Parent: true}
- parent, err := NewConfiguredQueue(parentConfig, nil, false, nil)
+ parent, err := NewConfiguredQueue(configs.QueueConfig{Name: "parent",
Parent: true}, nil, false, nil)
assert.NilError(t, err)
-
- parentConfig1 := configs.QueueConfig{Name: "parent1", Parent: true}
- parent1, err := NewConfiguredQueue(parentConfig1, nil, false, nil)
+ parent1, err := NewConfiguredQueue(configs.QueueConfig{Name: "parent1",
Parent: true}, nil, false, nil)
assert.NilError(t, err)
-
- parentConfig2 := configs.QueueConfig{Name: "parent2", Parent: true}
- parent2, err := NewConfiguredQueue(parentConfig2, nil, false, nil)
+ parent2, err := NewConfiguredQueue(configs.QueueConfig{Name: "parent2",
Parent: true}, nil, false, nil)
assert.NilError(t, err)
leaf111G, leaf12G, leaf211G, leaf22G, leaf4G := createQueueSetups(t,
parent, configs.Resources{Guaranteed: map[string]string{"first": "10"}},
configs.Resources{})
leaf111, leaf12, leaf211, leaf22, leaf4 := createQueueSetups(t,
parent1, configs.Resources{}, configs.Resources{})
-
leaf111WithParentG, leaf12WithParentG, leaf211WithParentG,
leaf22WithParentG, leaf4WithParentG := createQueueSetups(t, parent2,
configs.Resources{}, configs.Resources{})
- suitableVictims := make([]*Allocation, 0)
+ var suitableVictims, notSuitableVictims, suitableVictims1,
suitableVictims2, suitableVictims3 []*Allocation
suitableVictims = append(suitableVictims, createVictim(t, "ask1", node,
5, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10})))
suitableVictims = append(suitableVictims, createVictim(t, "ask2", node,
4, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10})))
suitableVictims = append(suitableVictims, createVictim(t, "ask3", node,
4, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10})))
+ notSuitableVictims = append(notSuitableVictims, createVictim(t,
"ask3_1", node, 4,
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 11})))
- leafGVictims := make(map[*Queue][]*Allocation)
+ leafGVictims, leafGNotSuitableVictims, leafVictims,
leafVictimsWithParentG := make(map[*Queue][]*Allocation),
make(map[*Queue][]*Allocation), make(map[*Queue][]*Allocation),
make(map[*Queue][]*Allocation)
leafGVictims[leaf111G] = suitableVictims
- leafVictims := make(map[*Queue][]*Allocation)
+ leafGNotSuitableVictims[leaf111G] = notSuitableVictims
leafVictims[leaf111] = suitableVictims
- suitableVictims1 := make([]*Allocation, 0)
suitableVictims1 = append(suitableVictims1, createVictim(t, "ask4",
node, 5, resources.NewResourceFromMap(map[string]resources.Quantity{"first":
10})))
suitableVictims1 = append(suitableVictims1, createVictim(t, "ask5",
node, 4, resources.NewResourceFromMap(map[string]resources.Quantity{"first":
10})))
leafGVictims[leaf12G] = suitableVictims1
leafVictims[leaf12] = suitableVictims1
- suitableVictims2 := make([]*Allocation, 0)
suitableVictims2 = append(suitableVictims2, createVictim(t, "ask6",
node, 5, resources.NewResourceFromMap(map[string]resources.Quantity{"first":
10})))
suitableVictims2 = append(suitableVictims2, createVictim(t, "ask7",
node, 4, resources.NewResourceFromMap(map[string]resources.Quantity{"first":
10})))
suitableVictims2 = append(suitableVictims2, createVictim(t, "ask8",
node, 5, resources.NewResourceFromMap(map[string]resources.Quantity{"first":
10})))
@@ -523,18 +535,15 @@ func TestQuotaChangeTryPreemptionForParentQueue(t
*testing.T) {
leafGVictims[leaf211G] = suitableVictims2
leafVictims[leaf211] = suitableVictims2
- suitableVictims3 := make([]*Allocation, 0)
suitableVictims3 = append(suitableVictims3, createVictim(t, "ask11",
node, 5, resources.NewResourceFromMap(map[string]resources.Quantity{"first":
10})))
suitableVictims3 = append(suitableVictims3, createVictim(t, "ask12",
node, 4, resources.NewResourceFromMap(map[string]resources.Quantity{"first":
10})))
suitableVictims3 = append(suitableVictims3, createVictim(t, "ask13",
node, 4, resources.NewResourceFromMap(map[string]resources.Quantity{"first":
10})))
leafGVictims[leaf22G] = suitableVictims3
leafVictims[leaf22] = suitableVictims3
- v := createVictim(t, "ask14", node, 5,
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}))
- leafGVictims[leaf4G] = []*Allocation{v}
- leafVictims[leaf4] = []*Allocation{v}
+ leafGVictims[leaf4G] = []*Allocation{createVictim(t, "ask14", node, 5,
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}))}
+ leafVictims[leaf4] = []*Allocation{createVictim(t, "ask14", node, 5,
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}))}
- leafVictimsWithParentG := make(map[*Queue][]*Allocation)
leafVictimsWithParentG[leaf111WithParentG] =
[]*Allocation{createVictim(t, "ask15", node, 4,
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 13})),
createVictim(t, "ask16", node, 4,
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 12}))}
leafVictimsWithParentG[leaf12WithParentG] =
[]*Allocation{createVictim(t, "ask17", node, 4,
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 13})),
@@ -545,25 +554,25 @@ func TestQuotaChangeTryPreemptionForParentQueue(t
*testing.T) {
createVictim(t, "ask22", node, 4,
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 12}))}
leafVictimsWithParentG[leaf4WithParentG] =
[]*Allocation{createVictim(t, "ask23", node, 4,
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 13})),
createVictim(t, "ask24", node, 4,
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 12}))}
-
oldMax :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 130})
newMax :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10})
oldMax1 :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 150})
- newMax1 :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 20})
- newMax2 :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5})
testCases := []struct {
- name string
- queue *Queue
- oldMax *resources.Resource
- newMax *resources.Resource
- victims map[*Queue][]*Allocation
- expectedVictims int
+ name string
+ queue *Queue
+ oldMax *resources.Resource
+ newMax *resources.Resource
+ victims map[*Queue][]*Allocation
+ claimedResources *resources.Resource
+ totalVictims int
+ expectedVictims int
}{
- {"Guaranteed set on one side of queue hierarchy - suitable
victims available", parent, oldMax, newMax, leafGVictims, 11},
- {"Guaranteed set not set on any queue - suitable victims
available", parent1, oldMax, newMax, leafVictims, 9},
- {"Guaranteed set only on parent queue but not on any child
queues underneath - suitable victims available", parent2, oldMax1, newMax1,
leafVictimsWithParentG, 5},
- {"Guaranteed set only on parent queue but not on any child
queues underneath - suitable victims available", parent2, oldMax1, newMax2,
leafVictimsWithParentG, 5},
+ {"Guaranteed set on one side of queue hierarchy - suitable
victims available", parent, oldMax, newMax, leafGVictims,
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 110}), 13,
11},
+ {"Guaranteed set on one side of queue hierarchy - victims
available but none suitable", parent, oldMax, newMax, leafGNotSuitableVictims,
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 110}), 1,
0},
+ {"Guaranteed set not set on any queue - suitable victims
available", parent1, oldMax, newMax, leafVictims,
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 90}), 14,
9},
+ {"Guaranteed set only on parent queue but not on any child
queues underneath - suitable victims available", parent2, oldMax1,
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 20}),
leafVictimsWithParentG,
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 60}), 10,
5},
+ {"Guaranteed set only on parent queue but not on any child
queues underneath - suitable victims available", parent2, oldMax1,
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}),
leafVictimsWithParentG,
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 60}), 10,
5},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
@@ -573,6 +582,7 @@ func TestQuotaChangeTryPreemptionForParentQueue(t
*testing.T) {
}
tc.queue.maxResource = tc.newMax
tc.queue.guaranteedResource = tc.newMax
+ preemptableResource :=
resources.SubOnlyExisting(tc.newMax, tc.queue.allocatedResource)
preemptor := NewQuotaPreemptor(tc.queue)
preemptor.tryPreemption()
victimsCount := 0
@@ -584,14 +594,31 @@ func TestQuotaChangeTryPreemptionForParentQueue(t
*testing.T) {
}
}
assert.Equal(t, victimsCount, tc.expectedVictims)
+ time.Sleep(500 * time.Millisecond)
+ assertQuotaPreemptionEvent(t, tc.expectedVictims,
"Quota Preemption results summary: preemptable resources:
"+resources.Multiply(preemptableResource, -1).String()+", claimed resources:
"+tc.claimedResources.String()+", selected victims:
"+strconv.Itoa(tc.totalVictims)+", preempted victims:
"+strconv.Itoa(tc.expectedVictims), eventSystem.Store.CollectEvents())
for _, v := range tc.victims {
removeAllocationAsks(node, v)
}
resetQueue(tc.queue)
+
+ // clear the events sent later after earlier collection
+ _ = eventSystem.Store.CollectEvents()
})
}
}
+func assertQuotaPreemptionEvent(t *testing.T, victims int, results string,
records []*si.EventRecord) {
+ recordsLen := len(records)
+ if victims > 0 {
+ assert.Equal(t, si.EventRecord_QUEUE,
records[recordsLen-1].Type)
+ assert.Equal(t, si.EventRecord_SET,
records[recordsLen-1].EventChangeType)
+ assert.Equal(t, si.EventRecord_QUEUE_PREEMPTION,
records[recordsLen-1].EventChangeDetail)
+ assert.Equal(t, results, records[recordsLen-1].Message)
+ } else {
+ assert.Assert(t,
!strings.Contains(records[len(records)-1].Message, "Quota Preemption results
summary"))
+ }
+}
+
// createQueueSetups Creates a queue hierarchy
// Queue Structure:
// parent
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]