This is an automated email from the ASF dual-hosted git repository.
pbacsko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git
The following commit(s) were added to refs/heads/master by this push:
new 7511f305 [YUNIKORN-3131] Improve DaemonSet preemption diagnostic
messages (#1032)
7511f305 is described below
commit 7511f30539c781b30568047df20a8127b0278260
Author: Peter Bacsko <[email protected]>
AuthorDate: Thu Oct 16 15:47:09 2025 +0200
[YUNIKORN-3131] Improve DaemonSet preemption diagnostic messages (#1032)
Closes: #1032
Signed-off-by: Peter Bacsko <[email protected]>
---
pkg/scheduler/objects/application.go | 22 ++++++-
pkg/scheduler/objects/queue_test.go | 6 +-
pkg/scheduler/objects/required_node_preemptor.go | 29 ++++++++-
.../objects/required_node_preemptor_test.go | 71 +++++++++++++++-------
4 files changed, 99 insertions(+), 29 deletions(-)
diff --git a/pkg/scheduler/objects/application.go
b/pkg/scheduler/objects/application.go
index b2c54318..db7929f9 100644
--- a/pkg/scheduler/objects/application.go
+++ b/pkg/scheduler/objects/application.go
@@ -56,6 +56,8 @@ var (
)
var initAppLogOnce sync.Once
var rateLimitedAppLog *log.RateLimitedLogger
+var initReqNodeLogOnce sync.Once
+var rateLimitedReqNodeLog *log.RateLimitedLogger
const (
Soft string = "Soft"
@@ -1382,7 +1384,7 @@ func (sa *Application) tryPreemption(headRoom
*resources.Resource, preemptionDel
func (sa *Application) tryRequiredNodePreemption(reserve *reservation, ask
*Allocation) bool {
// try preemption and see if we can free up resource
preemptor := NewRequiredNodePreemptor(reserve.node, ask)
- preemptor.filterAllocations()
+ result := preemptor.filterAllocations()
preemptor.sortAllocations()
// Are there any victims/asks to preempt?
@@ -1390,12 +1392,14 @@ func (sa *Application)
tryRequiredNodePreemption(reserve *reservation, ask *Allo
if len(victims) > 0 {
log.Log(log.SchedApplication).Info("Found victims for required
node preemption",
zap.String("ds allocation key", ask.GetAllocationKey()),
+ zap.String("allocation name", ask.GetAllocationName()),
zap.Int("no.of victims", len(victims)))
for _, victim := range victims {
if victimQueue :=
sa.queue.FindQueueByAppID(victim.GetApplicationID()); victimQueue != nil {
victimQueue.IncPreemptingResource(victim.GetAllocatedResource())
}
victim.MarkPreempted()
+
victim.SendPreemptedBySchedulerEvent(ask.GetAllocationKey(),
ask.GetApplicationID(), sa.ApplicationID)
}
ask.MarkTriggeredPreemption()
sa.notifyRMAllocationReleased(victims,
si.TerminationType_PREEMPTED_BY_SCHEDULER,
@@ -1404,6 +1408,15 @@ func (sa *Application) tryRequiredNodePreemption(reserve
*reservation, ask *Allo
}
ask.LogAllocationFailure(common.NoVictimForRequiredNode, true)
ask.SendRequiredNodePreemptionFailedEvent(reserve.node.NodeID)
+ getRateLimitedReqNodeLog().Info("no victim found for required node
preemption",
+ zap.String("allocation key", ask.GetAllocationKey()),
+ zap.String("allocation name", ask.GetAllocationName()),
+ zap.String("node", reserve.node.NodeID),
+ zap.Int("total allocations", result.totalAllocations),
+ zap.Int("requiredNode allocations",
result.requiredNodeAllocations),
+ zap.Int("allocations already preempted",
result.alreadyPreemptedAllocations),
+ zap.Int("higher priority allocations",
result.higherPriorityAllocations),
+ zap.Int("allocations with non-matching resources",
result.atLeastOneResNotMatched))
return false
}
@@ -2183,6 +2196,13 @@ func getRateLimitedAppLog() *log.RateLimitedLogger {
return rateLimitedAppLog
}
+func getRateLimitedReqNodeLog() *log.RateLimitedLogger {
+ initReqNodeLogOnce.Do(func() {
+ rateLimitedReqNodeLog =
log.NewRateLimitedLogger(log.SchedApplication, time.Minute)
+ })
+ return rateLimitedReqNodeLog
+}
+
func (sa *Application) updateRunnableStatus(runnableInQueue,
runnableByUserLimit bool) {
sa.Lock()
defer sa.Unlock()
diff --git a/pkg/scheduler/objects/queue_test.go
b/pkg/scheduler/objects/queue_test.go
index 2132e901..0933ae54 100644
--- a/pkg/scheduler/objects/queue_test.go
+++ b/pkg/scheduler/objects/queue_test.go
@@ -1873,9 +1873,9 @@ func TestFindEligiblePreemptionVictims(t *testing.T) {
parentGuar := map[string]string{siCommon.Memory: "100"}
ask := createAllocationAsk("ask1", appID1, true, true, 0, res)
ask2 := createAllocationAsk("ask2", appID2, true, true, -1000, res)
- alloc2 := createAllocation("ask2", appID2, nodeID1, true, true, -1000,
res)
+ alloc2 := createAllocation("ask2", appID2, nodeID1, true, true, -1000,
false, res)
ask3 := createAllocationAsk("ask3", appID2, true, true, -1000, res)
- alloc3 := createAllocation("ask3", appID2, nodeID1, true, true, -1000,
res)
+ alloc3 := createAllocation("ask3", appID2, nodeID1, true, true, -1000,
false, res)
root, err := createRootQueue(map[string]string{siCommon.Memory: "1000"})
assert.NilError(t, err, "failed to create queue")
parent1, err := createManagedQueueGuaranteed(root, "parent1", true,
parentMax, parentGuar)
@@ -1985,7 +1985,7 @@ func TestFindEligiblePreemptionVictims(t *testing.T) {
assert.Equal(t, alloc3.allocationKey,
victims(snapshot)[0].allocationKey, "wrong alloc")
// recreate alloc2 to restore non-prempted state
app2.RemoveAllocation(alloc2.GetAllocationKey(),
si.TerminationType_STOPPED_BY_RM)
- alloc2 = createAllocation("ask2", appID2, nodeID1, true, true, -1000,
res)
+ alloc2 = createAllocation("ask2", appID2, nodeID1, true, true, -1000,
false, res)
app2.AddAllocation(alloc2)
// setting priority offset on parent2 queue should remove leaf2 victims
diff --git a/pkg/scheduler/objects/required_node_preemptor.go
b/pkg/scheduler/objects/required_node_preemptor.go
index 821a7ea4..6d111b47 100644
--- a/pkg/scheduler/objects/required_node_preemptor.go
+++ b/pkg/scheduler/objects/required_node_preemptor.go
@@ -30,6 +30,14 @@ type PreemptionContext struct {
allocations []*Allocation
}
+type filteringResult struct {
+ totalAllocations int // total number of allocations
+ requiredNodeAllocations int // number of requiredNode (daemon set)
allocations that cannot be preempted
+ atLeastOneResNotMatched int // number of allocations where there's
no single resource type that would match
+ higherPriorityAllocations int // number of allocations with higher
priority
+ alreadyPreemptedAllocations int // number of allocations already
preempted
+}
+
func NewRequiredNodePreemptor(node *Node, requiredAsk *Allocation)
*PreemptionContext {
preemptor := &PreemptionContext{
node: node,
@@ -39,15 +47,26 @@ func NewRequiredNodePreemptor(node *Node, requiredAsk
*Allocation) *PreemptionCo
return preemptor
}
-func (p *PreemptionContext) filterAllocations() {
- for _, allocation := range p.node.GetYunikornAllocations() {
+func (p *PreemptionContext) filterAllocations() filteringResult {
+ var result filteringResult
+ yunikornAllocations := p.node.GetYunikornAllocations()
+ result.totalAllocations = len(yunikornAllocations)
+
+ for _, allocation := range yunikornAllocations {
// skip daemon set pods and higher priority allocation
- if allocation.GetRequiredNode() != "" ||
allocation.GetPriority() > p.requiredAsk.GetPriority() {
+ if allocation.GetRequiredNode() != "" {
+ result.requiredNodeAllocations++
+ continue
+ }
+
+ if allocation.GetPriority() > p.requiredAsk.GetPriority() {
+ result.higherPriorityAllocations++
continue
}
// skip if the allocation is already being preempted
if allocation.IsPreempted() {
+ result.alreadyPreemptedAllocations++
continue
}
@@ -61,8 +80,12 @@ func (p *PreemptionContext) filterAllocations() {
}
if includeAllocation {
p.allocations = append(p.allocations, allocation)
+ } else {
+ result.atLeastOneResNotMatched++
}
}
+
+ return result
}
// sort based on the following criteria in the specified order:
diff --git a/pkg/scheduler/objects/required_node_preemptor_test.go
b/pkg/scheduler/objects/required_node_preemptor_test.go
index 500e76e0..c35d8deb 100644
--- a/pkg/scheduler/objects/required_node_preemptor_test.go
+++ b/pkg/scheduler/objects/required_node_preemptor_test.go
@@ -25,6 +25,7 @@ import (
"gotest.tools/v3/assert"
"github.com/apache/yunikorn-core/pkg/common/resources"
+ siCommon "github.com/apache/yunikorn-scheduler-interface/lib/go/common"
"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
)
@@ -44,7 +45,7 @@ func createAllocationAsk(allocationKey string, app string,
allowPreemption bool,
return ask
}
-func createAllocation(allocationKey string, app string, nodeID string,
allowPreemption bool, isOriginator bool, priority int32, res
*resources.Resource) *Allocation {
+func createAllocation(allocationKey string, app string, nodeID string,
allowPreemption bool, isOriginator bool, priority int32, requiredNode bool, res
*resources.Resource) *Allocation {
tags := map[string]string{}
siAsk := &si.Allocation{
AllocationKey: allocationKey,
@@ -57,6 +58,9 @@ func createAllocation(allocationKey string, app string,
nodeID string, allowPree
AllocationTags: tags,
NodeID: nodeID,
}
+ if requiredNode {
+ tags[siCommon.DomainYuniKorn+siCommon.KeyRequiredNode] = nodeID
+ }
ask := NewAllocationFromSI(siAsk)
return ask
}
@@ -67,58 +71,58 @@ func prepareAllocationAsks(t *testing.T, node *Node)
[]*Allocation {
result := make([]*Allocation, 0)
// regular pods
- alloc1 := createAllocation("ask1", "app1", node.NodeID, true, false, 10,
+ alloc1 := createAllocation("ask1", "app1", node.NodeID, true, false,
10, false,
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}))
assert.Assert(t, node.TryAddAllocation(alloc1))
result = append(result, alloc1)
- alloc2 := createAllocation("ask2", "app1", node.NodeID, true, false, 10,
+ alloc2 := createAllocation("ask2", "app1", node.NodeID, true, false,
10, false,
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 8}))
alloc2.createTime = createTime
assert.Assert(t, node.TryAddAllocation(alloc2))
result = append(result, alloc2)
- alloc3 := createAllocation("ask3", "app1", node.NodeID, true, false, 15,
+ alloc3 := createAllocation("ask3", "app1", node.NodeID, true, false,
15, false,
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}))
assert.Assert(t, node.TryAddAllocation(alloc3))
result = append(result, alloc3)
- alloc4 := createAllocation("ask4", "app1", node.NodeID, true, false, 10,
+ alloc4 := createAllocation("ask4", "app1", node.NodeID, true, false,
10, false,
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}))
alloc4.createTime = createTime
assert.Assert(t, node.TryAddAllocation(alloc4))
result = append(result, alloc4)
- alloc5 := createAllocation("ask5", "app1", node.NodeID, true, false, 5,
+ alloc5 := createAllocation("ask5", "app1", node.NodeID, true, false, 5,
false,
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}))
assert.Assert(t, node.TryAddAllocation(alloc5))
result = append(result, alloc5)
// opted out pods
- alloc6 := createAllocation("ask6", "app1", node.NodeID, false, false,
10,
+ alloc6 := createAllocation("ask6", "app1", node.NodeID, false, false,
10, false,
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}))
assert.Assert(t, node.TryAddAllocation(alloc6))
result = append(result, alloc6)
- alloc7 := createAllocation("ask7", "app1", node.NodeID, false, false,
10,
+ alloc7 := createAllocation("ask7", "app1", node.NodeID, false, false,
10, false,
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 8}))
alloc7.createTime = createTime
assert.Assert(t, node.TryAddAllocation(alloc7))
result = append(result, alloc7)
- alloc8 := createAllocation("ask8", "app1", node.NodeID, false, false,
15,
+ alloc8 := createAllocation("ask8", "app1", node.NodeID, false, false,
15, false,
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}))
assert.Assert(t, node.TryAddAllocation(alloc8))
result = append(result, alloc8)
// driver/owner pods
- alloc9 := createAllocation("ask9", "app1", node.NodeID, false, true, 10,
+ alloc9 := createAllocation("ask9", "app1", node.NodeID, false, true,
10, false,
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}))
alloc9.createTime = createTime
assert.Assert(t, node.TryAddAllocation(alloc9))
result = append(result, alloc9)
- alloc10 := createAllocation("ask10", "app1", node.NodeID, true, true, 5,
+ alloc10 := createAllocation("ask10", "app1", node.NodeID, true, true,
5, false,
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}))
assert.Assert(t, node.TryAddAllocation(alloc10))
result = append(result, alloc10)
@@ -201,7 +205,8 @@ func TestFilterAllocations(t *testing.T) {
resources.NewResourceFromMap(map[string]resources.Quantity{"second": 5}))
p := NewRequiredNodePreemptor(node, requiredAsk)
asks := prepareAllocationAsks(t, node)
- p.filterAllocations()
+ result := p.filterAllocations()
+ verifyFilterResult(t, 10, 0, 10, 0, 0, result)
filteredAllocations := p.getAllocations()
// allocations are not even considered as there is no match. of course,
no victims
@@ -213,7 +218,8 @@ func TestFilterAllocations(t *testing.T) {
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}))
p1 := NewRequiredNodePreemptor(node, requiredAsk1)
asks = prepareAllocationAsks(t, node)
- p1.filterAllocations()
+ result = p1.filterAllocations()
+ verifyFilterResult(t, 10, 0, 0, 10, 0, result)
filteredAllocations = p.getAllocations()
// allocations are not even considered as there is no match. of course,
no victims
@@ -225,27 +231,39 @@ func TestFilterAllocations(t *testing.T) {
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}))
p2 := NewRequiredNodePreemptor(node, requiredAsk2)
asks = prepareAllocationAsks(t, node)
- p2.filterAllocations()
+ result = p2.filterAllocations()
+ verifyFilterResult(t, 10, 0, 0, 0, 0, result)
filteredAllocations = p2.getAllocations()
assert.Equal(t, len(filteredAllocations), 10)
removeAllocationAsks(node, asks)
- // case 4: allocation has preempted
+ // case 4: allocation has been preempted
requiredAsk3 := createAllocationAsk("ask12", "app1", true, true, 20,
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}))
p3 := NewRequiredNodePreemptor(node, requiredAsk3)
asks = prepareAllocationAsks(t, node)
- p3.filterAllocations()
+ node.GetAllocation("ask5").MarkPreempted() // "ask5" would be the first
and only victim without this
+ result = p3.filterAllocations()
p3.sortAllocations()
- victims := p3.GetVictims()
- for _, victim := range victims {
- victim.MarkPreempted()
- }
- p3.filterAllocations()
+ verifyFilterResult(t, 10, 0, 0, 0, 1, result)
filteredAllocations = p3.getAllocations()
- assert.Equal(t, len(filteredAllocations), 19)
+ assert.Equal(t, len(filteredAllocations), 9) // "ask5" is no longer
considered as a victim
removeAllocationAsks(node, asks)
+
+ // case 5: existing required node allocation
+ requiredAsk4 := createAllocationAsk("ask12", "app1", true, true, 20,
+
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}))
+ p4 := NewRequiredNodePreemptor(node, requiredAsk4)
+ _ = prepareAllocationAsks(t, node)
+ allocReqNode := createAllocation("ask11", "app1", node.NodeID, true,
true, 5, true,
+
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}))
+ assert.Assert(t, node.TryAddAllocation(allocReqNode))
+
+ result = p4.filterAllocations()
+ verifyFilterResult(t, 11, 1, 0, 0, 0, result)
+ filteredAllocations = p4.getAllocations()
+ assert.Equal(t, len(filteredAllocations), 10)
}
func TestGetVictims(t *testing.T) {
@@ -305,3 +323,12 @@ func TestGetVictims(t *testing.T) {
assert.Equal(t, len(victims3), 0)
removeAllocationAsks(node, asks)
}
+
+func verifyFilterResult(t *testing.T, totalAllocations,
requiredNodeAllocations, resourceNotEnough, higherPriorityAllocations,
alreadyPreemptedAllocations int, result filteringResult) {
+ t.Helper()
+ assert.Equal(t, totalAllocations, result.totalAllocations)
+ assert.Equal(t, requiredNodeAllocations, result.requiredNodeAllocations)
+ assert.Equal(t, resourceNotEnough, result.atLeastOneResNotMatched)
+ assert.Equal(t, higherPriorityAllocations,
result.higherPriorityAllocations)
+ assert.Equal(t, alreadyPreemptedAllocations,
result.alreadyPreemptedAllocations)
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]