This is an automated email from the ASF dual-hosted git repository.

pbacsko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git


The following commit(s) were added to refs/heads/master by this push:
     new 7511f305 [YUNIKORN-3131] Improve DaemonSet preemption diagnostic 
messages (#1032)
7511f305 is described below

commit 7511f30539c781b30568047df20a8127b0278260
Author: Peter Bacsko <[email protected]>
AuthorDate: Thu Oct 16 15:47:09 2025 +0200

    [YUNIKORN-3131] Improve DaemonSet preemption diagnostic messages (#1032)
    
    Closes: #1032
    
    Signed-off-by: Peter Bacsko <[email protected]>
---
 pkg/scheduler/objects/application.go               | 22 ++++++-
 pkg/scheduler/objects/queue_test.go                |  6 +-
 pkg/scheduler/objects/required_node_preemptor.go   | 29 ++++++++-
 .../objects/required_node_preemptor_test.go        | 71 +++++++++++++++-------
 4 files changed, 99 insertions(+), 29 deletions(-)

diff --git a/pkg/scheduler/objects/application.go 
b/pkg/scheduler/objects/application.go
index b2c54318..db7929f9 100644
--- a/pkg/scheduler/objects/application.go
+++ b/pkg/scheduler/objects/application.go
@@ -56,6 +56,8 @@ var (
 )
 var initAppLogOnce sync.Once
 var rateLimitedAppLog *log.RateLimitedLogger
+var initReqNodeLogOnce sync.Once
+var rateLimitedReqNodeLog *log.RateLimitedLogger
 
 const (
        Soft string = "Soft"
@@ -1382,7 +1384,7 @@ func (sa *Application) tryPreemption(headRoom 
*resources.Resource, preemptionDel
 func (sa *Application) tryRequiredNodePreemption(reserve *reservation, ask 
*Allocation) bool {
        // try preemption and see if we can free up resource
        preemptor := NewRequiredNodePreemptor(reserve.node, ask)
-       preemptor.filterAllocations()
+       result := preemptor.filterAllocations()
        preemptor.sortAllocations()
 
        // Are there any victims/asks to preempt?
@@ -1390,12 +1392,14 @@ func (sa *Application) 
tryRequiredNodePreemption(reserve *reservation, ask *Allo
        if len(victims) > 0 {
                log.Log(log.SchedApplication).Info("Found victims for required 
node preemption",
                        zap.String("ds allocation key", ask.GetAllocationKey()),
+                       zap.String("allocation name", ask.GetAllocationName()),
                        zap.Int("no.of victims", len(victims)))
                for _, victim := range victims {
                        if victimQueue := 
sa.queue.FindQueueByAppID(victim.GetApplicationID()); victimQueue != nil {
                                
victimQueue.IncPreemptingResource(victim.GetAllocatedResource())
                        }
                        victim.MarkPreempted()
+                       
victim.SendPreemptedBySchedulerEvent(ask.GetAllocationKey(), 
ask.GetApplicationID(), sa.ApplicationID)
                }
                ask.MarkTriggeredPreemption()
                sa.notifyRMAllocationReleased(victims, 
si.TerminationType_PREEMPTED_BY_SCHEDULER,
@@ -1404,6 +1408,15 @@ func (sa *Application) tryRequiredNodePreemption(reserve 
*reservation, ask *Allo
        }
        ask.LogAllocationFailure(common.NoVictimForRequiredNode, true)
        ask.SendRequiredNodePreemptionFailedEvent(reserve.node.NodeID)
+       getRateLimitedReqNodeLog().Info("no victim found for required node 
preemption",
+               zap.String("allocation key", ask.GetAllocationKey()),
+               zap.String("allocation name", ask.GetAllocationName()),
+               zap.String("node", reserve.node.NodeID),
+               zap.Int("total allocations", result.totalAllocations),
+               zap.Int("requiredNode allocations", 
result.requiredNodeAllocations),
+               zap.Int("allocations already preempted", 
result.alreadyPreemptedAllocations),
+               zap.Int("higher priority allocations", 
result.higherPriorityAllocations),
+               zap.Int("allocations with non-matching resources", 
result.atLeastOneResNotMatched))
        return false
 }
 
@@ -2183,6 +2196,13 @@ func getRateLimitedAppLog() *log.RateLimitedLogger {
        return rateLimitedAppLog
 }
 
+func getRateLimitedReqNodeLog() *log.RateLimitedLogger {
+       initReqNodeLogOnce.Do(func() {
+               rateLimitedReqNodeLog = 
log.NewRateLimitedLogger(log.SchedApplication, time.Minute)
+       })
+       return rateLimitedReqNodeLog
+}
+
 func (sa *Application) updateRunnableStatus(runnableInQueue, 
runnableByUserLimit bool) {
        sa.Lock()
        defer sa.Unlock()
diff --git a/pkg/scheduler/objects/queue_test.go 
b/pkg/scheduler/objects/queue_test.go
index 2132e901..0933ae54 100644
--- a/pkg/scheduler/objects/queue_test.go
+++ b/pkg/scheduler/objects/queue_test.go
@@ -1873,9 +1873,9 @@ func TestFindEligiblePreemptionVictims(t *testing.T) {
        parentGuar := map[string]string{siCommon.Memory: "100"}
        ask := createAllocationAsk("ask1", appID1, true, true, 0, res)
        ask2 := createAllocationAsk("ask2", appID2, true, true, -1000, res)
-       alloc2 := createAllocation("ask2", appID2, nodeID1, true, true, -1000, 
res)
+       alloc2 := createAllocation("ask2", appID2, nodeID1, true, true, -1000, 
false, res)
        ask3 := createAllocationAsk("ask3", appID2, true, true, -1000, res)
-       alloc3 := createAllocation("ask3", appID2, nodeID1, true, true, -1000, 
res)
+       alloc3 := createAllocation("ask3", appID2, nodeID1, true, true, -1000, 
false, res)
        root, err := createRootQueue(map[string]string{siCommon.Memory: "1000"})
        assert.NilError(t, err, "failed to create queue")
        parent1, err := createManagedQueueGuaranteed(root, "parent1", true, 
parentMax, parentGuar)
@@ -1985,7 +1985,7 @@ func TestFindEligiblePreemptionVictims(t *testing.T) {
        assert.Equal(t, alloc3.allocationKey, 
victims(snapshot)[0].allocationKey, "wrong alloc")
        // recreate alloc2 to restore non-prempted state
        app2.RemoveAllocation(alloc2.GetAllocationKey(), 
si.TerminationType_STOPPED_BY_RM)
-       alloc2 = createAllocation("ask2", appID2, nodeID1, true, true, -1000, 
res)
+       alloc2 = createAllocation("ask2", appID2, nodeID1, true, true, -1000, 
false, res)
        app2.AddAllocation(alloc2)
 
        // setting priority offset on parent2 queue should remove leaf2 victims
diff --git a/pkg/scheduler/objects/required_node_preemptor.go 
b/pkg/scheduler/objects/required_node_preemptor.go
index 821a7ea4..6d111b47 100644
--- a/pkg/scheduler/objects/required_node_preemptor.go
+++ b/pkg/scheduler/objects/required_node_preemptor.go
@@ -30,6 +30,14 @@ type PreemptionContext struct {
        allocations []*Allocation
 }
 
+type filteringResult struct {
+       totalAllocations            int // total number of allocations
+       requiredNodeAllocations     int // number of requiredNode (daemon set) 
allocations that cannot be preempted
+       atLeastOneResNotMatched     int // number of allocations where there's 
no single resource type that would match
+       higherPriorityAllocations   int // number of allocations with higher 
priority
+       alreadyPreemptedAllocations int // number of allocations already 
preempted
+}
+
 func NewRequiredNodePreemptor(node *Node, requiredAsk *Allocation) 
*PreemptionContext {
        preemptor := &PreemptionContext{
                node:        node,
@@ -39,15 +47,26 @@ func NewRequiredNodePreemptor(node *Node, requiredAsk 
*Allocation) *PreemptionCo
        return preemptor
 }
 
-func (p *PreemptionContext) filterAllocations() {
-       for _, allocation := range p.node.GetYunikornAllocations() {
+func (p *PreemptionContext) filterAllocations() filteringResult {
+       var result filteringResult
+       yunikornAllocations := p.node.GetYunikornAllocations()
+       result.totalAllocations = len(yunikornAllocations)
+
+       for _, allocation := range yunikornAllocations {
                // skip daemon set pods and higher priority allocation
-               if allocation.GetRequiredNode() != "" || 
allocation.GetPriority() > p.requiredAsk.GetPriority() {
+               if allocation.GetRequiredNode() != "" {
+                       result.requiredNodeAllocations++
+                       continue
+               }
+
+               if allocation.GetPriority() > p.requiredAsk.GetPriority() {
+                       result.higherPriorityAllocations++
                        continue
                }
 
                // skip if the allocation is already being preempted
                if allocation.IsPreempted() {
+                       result.alreadyPreemptedAllocations++
                        continue
                }
 
@@ -61,8 +80,12 @@ func (p *PreemptionContext) filterAllocations() {
                }
                if includeAllocation {
                        p.allocations = append(p.allocations, allocation)
+               } else {
+                       result.atLeastOneResNotMatched++
                }
        }
+
+       return result
 }
 
 // sort based on the following criteria in the specified order:
diff --git a/pkg/scheduler/objects/required_node_preemptor_test.go 
b/pkg/scheduler/objects/required_node_preemptor_test.go
index 500e76e0..c35d8deb 100644
--- a/pkg/scheduler/objects/required_node_preemptor_test.go
+++ b/pkg/scheduler/objects/required_node_preemptor_test.go
@@ -25,6 +25,7 @@ import (
        "gotest.tools/v3/assert"
 
        "github.com/apache/yunikorn-core/pkg/common/resources"
+       siCommon "github.com/apache/yunikorn-scheduler-interface/lib/go/common"
        "github.com/apache/yunikorn-scheduler-interface/lib/go/si"
 )
 
@@ -44,7 +45,7 @@ func createAllocationAsk(allocationKey string, app string, 
allowPreemption bool,
        return ask
 }
 
-func createAllocation(allocationKey string, app string, nodeID string, 
allowPreemption bool, isOriginator bool, priority int32, res 
*resources.Resource) *Allocation {
+func createAllocation(allocationKey string, app string, nodeID string, 
allowPreemption bool, isOriginator bool, priority int32, requiredNode bool, res 
*resources.Resource) *Allocation {
        tags := map[string]string{}
        siAsk := &si.Allocation{
                AllocationKey:    allocationKey,
@@ -57,6 +58,9 @@ func createAllocation(allocationKey string, app string, 
nodeID string, allowPree
                AllocationTags:   tags,
                NodeID:           nodeID,
        }
+       if requiredNode {
+               tags[siCommon.DomainYuniKorn+siCommon.KeyRequiredNode] = nodeID
+       }
        ask := NewAllocationFromSI(siAsk)
        return ask
 }
@@ -67,58 +71,58 @@ func prepareAllocationAsks(t *testing.T, node *Node) 
[]*Allocation {
        result := make([]*Allocation, 0)
 
        // regular pods
-       alloc1 := createAllocation("ask1", "app1", node.NodeID, true, false, 10,
+       alloc1 := createAllocation("ask1", "app1", node.NodeID, true, false, 
10, false,
                
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}))
        assert.Assert(t, node.TryAddAllocation(alloc1))
        result = append(result, alloc1)
 
-       alloc2 := createAllocation("ask2", "app1", node.NodeID, true, false, 10,
+       alloc2 := createAllocation("ask2", "app1", node.NodeID, true, false, 
10, false,
                
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 8}))
        alloc2.createTime = createTime
        assert.Assert(t, node.TryAddAllocation(alloc2))
        result = append(result, alloc2)
 
-       alloc3 := createAllocation("ask3", "app1", node.NodeID, true, false, 15,
+       alloc3 := createAllocation("ask3", "app1", node.NodeID, true, false, 
15, false,
                
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}))
        assert.Assert(t, node.TryAddAllocation(alloc3))
        result = append(result, alloc3)
 
-       alloc4 := createAllocation("ask4", "app1", node.NodeID, true, false, 10,
+       alloc4 := createAllocation("ask4", "app1", node.NodeID, true, false, 
10, false,
                
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}))
        alloc4.createTime = createTime
        assert.Assert(t, node.TryAddAllocation(alloc4))
        result = append(result, alloc4)
 
-       alloc5 := createAllocation("ask5", "app1", node.NodeID, true, false, 5,
+       alloc5 := createAllocation("ask5", "app1", node.NodeID, true, false, 5, 
false,
                
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}))
        assert.Assert(t, node.TryAddAllocation(alloc5))
        result = append(result, alloc5)
 
        // opted out pods
-       alloc6 := createAllocation("ask6", "app1", node.NodeID, false, false, 
10,
+       alloc6 := createAllocation("ask6", "app1", node.NodeID, false, false, 
10, false,
                
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}))
        assert.Assert(t, node.TryAddAllocation(alloc6))
        result = append(result, alloc6)
 
-       alloc7 := createAllocation("ask7", "app1", node.NodeID, false, false, 
10,
+       alloc7 := createAllocation("ask7", "app1", node.NodeID, false, false, 
10, false,
                
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 8}))
        alloc7.createTime = createTime
        assert.Assert(t, node.TryAddAllocation(alloc7))
        result = append(result, alloc7)
 
-       alloc8 := createAllocation("ask8", "app1", node.NodeID, false, false, 
15,
+       alloc8 := createAllocation("ask8", "app1", node.NodeID, false, false, 
15, false,
                
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}))
        assert.Assert(t, node.TryAddAllocation(alloc8))
        result = append(result, alloc8)
 
        // driver/owner pods
-       alloc9 := createAllocation("ask9", "app1", node.NodeID, false, true, 10,
+       alloc9 := createAllocation("ask9", "app1", node.NodeID, false, true, 
10, false,
                
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}))
        alloc9.createTime = createTime
        assert.Assert(t, node.TryAddAllocation(alloc9))
        result = append(result, alloc9)
 
-       alloc10 := createAllocation("ask10", "app1", node.NodeID, true, true, 5,
+       alloc10 := createAllocation("ask10", "app1", node.NodeID, true, true, 
5, false,
                
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}))
        assert.Assert(t, node.TryAddAllocation(alloc10))
        result = append(result, alloc10)
@@ -201,7 +205,8 @@ func TestFilterAllocations(t *testing.T) {
                
resources.NewResourceFromMap(map[string]resources.Quantity{"second": 5}))
        p := NewRequiredNodePreemptor(node, requiredAsk)
        asks := prepareAllocationAsks(t, node)
-       p.filterAllocations()
+       result := p.filterAllocations()
+       verifyFilterResult(t, 10, 0, 10, 0, 0, result)
        filteredAllocations := p.getAllocations()
 
        // allocations are not even considered as there is no match. of course, 
no victims
@@ -213,7 +218,8 @@ func TestFilterAllocations(t *testing.T) {
                
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}))
        p1 := NewRequiredNodePreemptor(node, requiredAsk1)
        asks = prepareAllocationAsks(t, node)
-       p1.filterAllocations()
+       result = p1.filterAllocations()
+       verifyFilterResult(t, 10, 0, 0, 10, 0, result)
        filteredAllocations = p.getAllocations()
 
        // allocations are not even considered as there is no match. of course, 
no victims
@@ -225,27 +231,39 @@ func TestFilterAllocations(t *testing.T) {
                
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}))
        p2 := NewRequiredNodePreemptor(node, requiredAsk2)
        asks = prepareAllocationAsks(t, node)
-       p2.filterAllocations()
+       result = p2.filterAllocations()
+       verifyFilterResult(t, 10, 0, 0, 0, 0, result)
        filteredAllocations = p2.getAllocations()
        assert.Equal(t, len(filteredAllocations), 10)
        removeAllocationAsks(node, asks)
 
-       // case 4: allocation has preempted
+       // case 4: allocation has been preempted
        requiredAsk3 := createAllocationAsk("ask12", "app1", true, true, 20,
                
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}))
        p3 := NewRequiredNodePreemptor(node, requiredAsk3)
        asks = prepareAllocationAsks(t, node)
-       p3.filterAllocations()
+       node.GetAllocation("ask5").MarkPreempted() // "ask5" would be the first 
and only victim without this
+       result = p3.filterAllocations()
        p3.sortAllocations()
 
-       victims := p3.GetVictims()
-       for _, victim := range victims {
-               victim.MarkPreempted()
-       }
-       p3.filterAllocations()
+       verifyFilterResult(t, 10, 0, 0, 0, 1, result)
        filteredAllocations = p3.getAllocations()
-       assert.Equal(t, len(filteredAllocations), 19)
+       assert.Equal(t, len(filteredAllocations), 9) // "ask5" is no longer 
considered as a victim
        removeAllocationAsks(node, asks)
+
+       // case 5: existing required node allocation
+       requiredAsk4 := createAllocationAsk("ask12", "app1", true, true, 20,
+               
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}))
+       p4 := NewRequiredNodePreemptor(node, requiredAsk4)
+       _ = prepareAllocationAsks(t, node)
+       allocReqNode := createAllocation("ask11", "app1", node.NodeID, true, 
true, 5, true,
+               
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}))
+       assert.Assert(t, node.TryAddAllocation(allocReqNode))
+
+       result = p4.filterAllocations()
+       verifyFilterResult(t, 11, 1, 0, 0, 0, result)
+       filteredAllocations = p4.getAllocations()
+       assert.Equal(t, len(filteredAllocations), 10)
 }
 
 func TestGetVictims(t *testing.T) {
@@ -305,3 +323,12 @@ func TestGetVictims(t *testing.T) {
        assert.Equal(t, len(victims3), 0)
        removeAllocationAsks(node, asks)
 }
+
+func verifyFilterResult(t *testing.T, totalAllocations, 
requiredNodeAllocations, resourceNotEnough, higherPriorityAllocations, 
alreadyPreemptedAllocations int, result filteringResult) {
+       t.Helper()
+       assert.Equal(t, totalAllocations, result.totalAllocations)
+       assert.Equal(t, requiredNodeAllocations, result.requiredNodeAllocations)
+       assert.Equal(t, resourceNotEnough, result.atLeastOneResNotMatched)
+       assert.Equal(t, higherPriorityAllocations, 
result.higherPriorityAllocations)
+       assert.Equal(t, alreadyPreemptedAllocations, 
result.alreadyPreemptedAllocations)
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to