This is an automated email from the ASF dual-hosted git repository.

ccondit pushed a commit to branch branch-1.6
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git


The following commit(s) were added to refs/heads/branch-1.6 by this push:
     new 6319824d [YUNIKORN-2980] DaemonSet preemption: Don't flood the logs if 
victim selection fails (#998)
6319824d is described below

commit 6319824de1426db4f0bb7cc28a5df35293b25894
Author: Peter Bacsko <[email protected]>
AuthorDate: Wed Nov 20 17:17:16 2024 -0600

    [YUNIKORN-2980] DaemonSet preemption: Don't flood the logs if victim 
selection fails (#998)
    
    Closes: #998
    
    Signed-off-by: Craig Condit <[email protected]>
---
 pkg/common/errors.go                            |   1 +
 pkg/scheduler/objects/allocation.go             |   5 +
 pkg/scheduler/objects/application.go            |   8 +-
 pkg/scheduler/objects/application_test.go       | 175 ++++++++++++++++++++++++
 pkg/scheduler/objects/events/ask_events.go      |  22 ++-
 pkg/scheduler/objects/events/ask_events_test.go |  40 +++++-
 6 files changed, 238 insertions(+), 13 deletions(-)

diff --git a/pkg/common/errors.go b/pkg/common/errors.go
index 1e8334ea..f73c84b4 100644
--- a/pkg/common/errors.go
+++ b/pkg/common/errors.go
@@ -27,3 +27,4 @@ const PreemptionPreconditionsFailed = "Preemption 
preconditions failed"
 const PreemptionDoesNotGuarantee = "Preemption queue guarantees check failed"
 const PreemptionShortfall = "Preemption helped but short of resources"
 const PreemptionDoesNotHelp = "Preemption does not help"
+const NoVictimForRequiredNode = "No fit on required node, preemption does not 
help"
diff --git a/pkg/scheduler/objects/allocation.go 
b/pkg/scheduler/objects/allocation.go
index 8f2450e9..3e1c1562 100644
--- a/pkg/scheduler/objects/allocation.go
+++ b/pkg/scheduler/objects/allocation.go
@@ -457,6 +457,11 @@ func (a *Allocation) 
SendPredicatesFailedEvent(predicateErrors map[string]int) {
        a.askEvents.SendPredicatesFailed(a.allocationKey, a.applicationID, 
predicateErrors, a.GetAllocatedResource())
 }
 
+// SendRequiredNodePreemptionFailedEvent updates the event system with 
required node preemption failed event.
+func (a *Allocation) SendRequiredNodePreemptionFailedEvent(node string) {
+       a.askEvents.SendRequiredNodePreemptionFailed(a.allocationKey, 
a.applicationID, node, a.GetAllocatedResource())
+}
+
 // GetAllocationLog returns a list of log entries corresponding to allocation 
preconditions not being met.
 func (a *Allocation) GetAllocationLog() []*AllocationLogEntry {
        a.RLock()
diff --git a/pkg/scheduler/objects/application.go 
b/pkg/scheduler/objects/application.go
index 0fe682aa..1347aa20 100644
--- a/pkg/scheduler/objects/application.go
+++ b/pkg/scheduler/objects/application.go
@@ -1406,9 +1406,6 @@ func (sa *Application) tryPreemption(headRoom 
*resources.Resource, preemptionDel
 }
 
 func (sa *Application) tryRequiredNodePreemption(reserve *reservation, ask 
*Allocation) bool {
-       log.Log(log.SchedApplication).Info("Triggering preemption process for 
daemon set ask",
-               zap.String("ds allocation key", ask.GetAllocationKey()))
-
        // try preemption and see if we can free up resource
        preemptor := NewRequiredNodePreemptor(reserve.node, ask)
        preemptor.filterAllocations()
@@ -1431,9 +1428,8 @@ func (sa *Application) tryRequiredNodePreemption(reserve 
*reservation, ask *Allo
                        "preempting allocations to free up resources to run 
daemon set ask: "+ask.GetAllocationKey())
                return true
        }
-       log.Log(log.SchedApplication).Warn("Problem in finding the victims for 
preempting resources to meet required ask requirements",
-               zap.String("ds allocation key", ask.GetAllocationKey()),
-               zap.String("node id", reserve.nodeID))
+       ask.LogAllocationFailure(common.NoVictimForRequiredNode, true)
+       ask.SendRequiredNodePreemptionFailedEvent(reserve.node.NodeID)
        return false
 }
 
diff --git a/pkg/scheduler/objects/application_test.go 
b/pkg/scheduler/objects/application_test.go
index 91db298f..dc1abb31 100644
--- a/pkg/scheduler/objects/application_test.go
+++ b/pkg/scheduler/objects/application_test.go
@@ -21,6 +21,7 @@ package objects
 import (
        "fmt"
        "math"
+       "strings"
        "testing"
        "time"
 
@@ -2900,6 +2901,172 @@ func TestPredicateFailedEvents(t *testing.T) {
        assert.Equal(t, "Unschedulable request 'alloc-0': fake predicate plugin 
failed (2x); ", event.Message)
 }
 
+func TestRequiredNodePreemption(t *testing.T) {
+       // tests successful RequiredNode (DaemonSet) preemption
+       app := newApplication(appID0, "default", "root.default")
+       var releaseEvents []*rmevent.RMReleaseAllocationEvent
+       app.rmEventHandler = &mockAppEventHandler{
+               callback: func(ev interface{}) {
+                       if rmEvent, ok := 
ev.(*rmevent.RMReleaseAllocationEvent); ok {
+                               releaseEvents = append(releaseEvents, rmEvent)
+                               go func() {
+                                       rmEvent.Channel <- &rmevent.Result{
+                                               Succeeded: true,
+                                       }
+                               }()
+                       }
+               },
+       }
+       node := newNode(nodeID1, map[string]resources.Quantity{"first": 20})
+       node.nodeEvents = schedEvt.NewNodeEvents(mock.NewEventSystemDisabled())
+       iterator := getNodeIteratorFn(node)
+       getNode := func(nodeID string) *Node {
+               return node
+       }
+
+       // set queue
+       rootQ, err := createRootQueue(map[string]string{"first": "20"})
+       assert.NilError(t, err)
+       childQ, err := createManagedQueue(rootQ, "default", false, 
map[string]string{"first": "20"})
+       assert.NilError(t, err)
+       app.SetQueue(childQ)
+
+       // add an ask
+       mockEvents := mock.NewEventSystem()
+       askRes := 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 15})
+       ask1 := newAllocationAsk("ask-1", "app-1", askRes)
+       ask1.askEvents = schedEvt.NewAskEvents(mockEvents)
+       err = app.AddAllocationAsk(ask1)
+       assert.NilError(t, err, "could not add ask-1")
+       preemptionAttemptsRemaining := 0
+
+       // allocate ask
+       headRoom := 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 50})
+       result := app.tryAllocate(headRoom, true, 30*time.Second, 
&preemptionAttemptsRemaining, iterator, iterator, getNode)
+       assert.Equal(t, result.ResultType, Allocated, "could not allocate 
ask-1")
+       assert.Equal(t, result.Request.allocationKey, "ask-1", "unexpected 
allocation key")
+
+       // add ask2 with required node
+       ask2 := newAllocationAsk("ask-2", "app-1", askRes)
+       ask2.askEvents = schedEvt.NewAskEvents(mockEvents)
+       ask2.requiredNode = nodeID1
+       err = app.AddAllocationAsk(ask2)
+       assert.NilError(t, err, "could not add ask-2")
+
+       // try to allocate ask2 with node being full - expect a reservation
+       result = app.tryAllocate(headRoom, true, 30*time.Second, 
&preemptionAttemptsRemaining, iterator, iterator, getNode)
+       assert.Equal(t, result.ResultType, Reserved, "allocation result is not 
reserved")
+       assert.Equal(t, result.Request.allocationKey, "ask-2", "unexpected 
allocation key")
+       err = app.Reserve(node, ask2)
+       assert.NilError(t, err, "reservation failed")
+
+       // preemption
+       assert.Assert(t, app.tryReservedAllocate(headRoom, iterator) == nil, 
"unexpected result from reserved allocation")
+       assert.Assert(t, ask1.IsPreempted(), "ask1 has not been preempted")
+       assert.Assert(t, ask2.HasTriggeredPreemption(), "ask2 has not triggered 
preemption")
+       assert.Equal(t, 1, len(releaseEvents), "unexpected number of release 
events")
+       assert.Equal(t, 1, len(releaseEvents[0].ReleasedAllocations), 
"unexpected number of release allocations")
+       assert.Equal(t, "ask-1", 
releaseEvents[0].ReleasedAllocations[0].AllocationKey, "allocation key")
+
+       // 2nd attempt - no preemption this time
+       releaseEvents = nil
+       assert.Assert(t, app.tryReservedAllocate(headRoom, iterator) == nil, 
"unexpected result from reserved allocation")
+       assert.Assert(t, releaseEvents == nil, "unexpected release event")
+
+       // check for preemption related events
+       for _, event := range mockEvents.Events {
+               assert.Assert(t, 
!strings.Contains(strings.ToLower(event.Message), "preemption"), "received a 
preemption related event")
+       }
+}
+
+func TestRequiredNodePreemptionFailed(t *testing.T) {
+       // tests RequiredNode (DaemonSet) preemption where the victim pod has a 
high priority, hence preemption is not possible
+       app := newApplication(appID0, "default", "root.default")
+       var releaseEvents []*rmevent.RMReleaseAllocationEvent
+       app.rmEventHandler = &mockAppEventHandler{
+               callback: func(ev interface{}) {
+                       if rmEvent, ok := 
ev.(*rmevent.RMReleaseAllocationEvent); ok {
+                               releaseEvents = append(releaseEvents, rmEvent)
+                               go func() {
+                                       rmEvent.Channel <- &rmevent.Result{
+                                               Succeeded: true,
+                                       }
+                               }()
+                       }
+               },
+       }
+       node := newNode(nodeID1, map[string]resources.Quantity{"first": 20})
+       node.nodeEvents = schedEvt.NewNodeEvents(mock.NewEventSystemDisabled())
+       iterator := getNodeIteratorFn(node)
+       getNode := func(nodeID string) *Node {
+               return node
+       }
+
+       // set queue
+       rootQ, err := createRootQueue(map[string]string{"first": "20"})
+       assert.NilError(t, err)
+       childQ, err := createManagedQueue(rootQ, "default", false, 
map[string]string{"first": "20"})
+       assert.NilError(t, err)
+       app.SetQueue(childQ)
+
+       // add an ask with high priority
+       mockEvents := mock.NewEventSystem()
+       askRes := 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 15})
+       ask1 := newAllocationAsk("ask-1", "app-1", askRes)
+       ask1.askEvents = schedEvt.NewAskEvents(mockEvents)
+       ask1.priority = 1000
+       err = app.AddAllocationAsk(ask1)
+       assert.NilError(t, err, "could not add ask-1")
+       preemptionAttemptsRemaining := 0
+
+       // allocate ask
+       headRoom := 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 50})
+       result := app.tryAllocate(headRoom, true, 30*time.Second, 
&preemptionAttemptsRemaining, iterator, iterator, getNode)
+       assert.Equal(t, result.ResultType, Allocated, "could not allocate 
ask-1")
+       assert.Equal(t, result.Request.allocationKey, "ask-1", "unexpected 
allocation key")
+
+       // add ask2 with required node
+       ask2 := newAllocationAsk("ask-2", "app-1", askRes)
+       ask2.askEvents = schedEvt.NewAskEvents(mockEvents)
+       ask2.requiredNode = nodeID1
+       err = app.AddAllocationAsk(ask2)
+       assert.NilError(t, err, "could not add ask-2")
+
+       // try to allocate ask2 with node being full - expect a reservation
+       result = app.tryAllocate(headRoom, true, 30*time.Second, 
&preemptionAttemptsRemaining, iterator, iterator, getNode)
+       assert.Equal(t, result.ResultType, Reserved, "allocation result is not 
reserved")
+       assert.Equal(t, result.Request.allocationKey, "ask-2", "unexpected 
allocation key")
+       err = app.Reserve(node, ask2)
+       assert.NilError(t, err, "reservation failed")
+
+       // try preemption - should not succeed
+       assert.Assert(t, app.tryReservedAllocate(headRoom, iterator) == nil, 
"unexpected result from reserved allocation")
+       assert.Assert(t, !ask1.IsPreempted(), "unexpected preemption of ask1")
+       assert.Assert(t, !ask2.HasTriggeredPreemption(), "unexpected preemption 
triggered from ask2")
+       assert.Equal(t, 0, len(releaseEvents), "unexpected number of release 
events")
+       // check for events
+       noEvents := 0
+       var requestEvt *si.EventRecord
+       for _, event := range mockEvents.Events {
+               if event.Type == si.EventRecord_REQUEST && 
strings.Contains(strings.ToLower(event.Message), "preemption") {
+                       noEvents++
+                       requestEvt = event
+               }
+       }
+       assert.Equal(t, 1, noEvents, "unexpected number of REQUEST events")
+       assert.Equal(t, "Unschedulable request 'ask-2' with required node 
'node-1', no preemption victim found", requestEvt.Message)
+       assert.Equal(t, 1, len(ask2.allocLog), "unexpected number of entries in 
the allocation log")
+       assert.Equal(t, int32(1), 
ask2.allocLog[common.NoVictimForRequiredNode].Count, "incorrect number of entry 
count")
+       assert.Equal(t, common.NoVictimForRequiredNode, 
ask2.allocLog[common.NoVictimForRequiredNode].Message, "unexpected log message")
+
+       // check counting & event throttling
+       assert.Assert(t, app.tryReservedAllocate(headRoom, iterator) == nil, 
"unexpected result from reserved allocation")
+       assert.Assert(t, app.tryReservedAllocate(headRoom, iterator) == nil, 
"unexpected result from reserved allocation")
+       assert.Assert(t, app.tryReservedAllocate(headRoom, iterator) == nil, 
"unexpected result from reserved allocation")
+       assert.Equal(t, 1, noEvents, "unexpected number of REQUEST events")
+       assert.Equal(t, int32(4), 
ask2.allocLog[common.NoVictimForRequiredNode].Count, "incorrect number of entry 
count")
+}
+
 type testIterator struct{}
 
 func (testIterator) ForEachNode(fn func(*Node) bool) {
@@ -3019,3 +3186,11 @@ func TestGetUint64Tag(t *testing.T) {
                })
        }
 }
+
+type mockAppEventHandler struct {
+       callback func(ev interface{})
+}
+
+func (m mockAppEventHandler) HandleEvent(ev interface{}) {
+       m.callback(ev)
+}
diff --git a/pkg/scheduler/objects/events/ask_events.go 
b/pkg/scheduler/objects/events/ask_events.go
index 64265c6d..0e10a253 100644
--- a/pkg/scheduler/objects/events/ask_events.go
+++ b/pkg/scheduler/objects/events/ask_events.go
@@ -32,8 +32,9 @@ import (
 
 // AskEvents Request-specific events. These events are of REQUEST type, so 
they are eventually sent to the respective pods in K8s.
 type AskEvents struct {
-       eventSystem events.EventSystem
-       limiter     *rate.Limiter
+       eventSystem      events.EventSystem
+       predicateLimiter *rate.Limiter
+       reqNodeLimiter   *rate.Limiter
 }
 
 func (ae *AskEvents) SendRequestExceedsQueueHeadroom(allocKey, appID string, 
headroom, allocatedResource *resources.Resource, queuePath string) {
@@ -73,7 +74,7 @@ func (ae *AskEvents) SendRequestFitsInUserQuota(allocKey, 
appID string, allocate
 }
 
 func (ae *AskEvents) SendPredicatesFailed(allocKey, appID string, 
predicateErrors map[string]int, allocatedResource *resources.Resource) {
-       if !ae.eventSystem.IsEventTrackingEnabled() || !ae.limiter.Allow() {
+       if !ae.eventSystem.IsEventTrackingEnabled() || 
!ae.predicateLimiter.Allow() {
                return
        }
 
@@ -94,13 +95,24 @@ func (ae *AskEvents) SendPredicatesFailed(allocKey, appID 
string, predicateError
        ae.eventSystem.AddEvent(event)
 }
 
+func (ae *AskEvents) SendRequiredNodePreemptionFailed(allocKey, appID, node 
string, allocatedResource *resources.Resource) {
+       if !ae.eventSystem.IsEventTrackingEnabled() || 
!ae.reqNodeLimiter.Allow() {
+               return
+       }
+
+       message := fmt.Sprintf("Unschedulable request '%s' with required node 
'%s', no preemption victim found", allocKey, node)
+       event := events.CreateRequestEventRecord(allocKey, appID, message, 
allocatedResource)
+       ae.eventSystem.AddEvent(event)
+}
+
 func NewAskEvents(evt events.EventSystem) *AskEvents {
        return newAskEventsWithRate(evt, 15*time.Second, 1)
 }
 
 func newAskEventsWithRate(evt events.EventSystem, interval time.Duration, 
burst int) *AskEvents {
        return &AskEvents{
-               eventSystem: evt,
-               limiter:     rate.NewLimiter(rate.Every(interval), burst),
+               eventSystem:      evt,
+               predicateLimiter: rate.NewLimiter(rate.Every(interval), burst),
+               reqNodeLimiter:   rate.NewLimiter(rate.Every(interval), burst),
        }
 }
diff --git a/pkg/scheduler/objects/events/ask_events_test.go 
b/pkg/scheduler/objects/events/ask_events_test.go
index add2fc53..8f6f8558 100644
--- a/pkg/scheduler/objects/events/ask_events_test.go
+++ b/pkg/scheduler/objects/events/ask_events_test.go
@@ -133,12 +133,48 @@ func TestPredicateFailedEvents(t *testing.T) {
        assert.Equal(t, 1, len(eventSystem.Events))
        event := eventSystem.Events[0]
        assert.Equal(t, "Unschedulable request 'alloc-0': error#0 (2x); error#1 
(123x); error#2 (44x); ", event.Message)
+       assert.Equal(t, si.EventRecord_REQUEST, event.Type)
+       assert.Equal(t, si.EventRecord_DETAILS_NONE, event.EventChangeDetail)
+       assert.Equal(t, "alloc-0", event.ObjectID)
+       assert.Equal(t, "app-0", event.ReferenceID)
 
        eventSystem.Reset()
        // wait a bit, a new event is expected
        time.Sleep(100 * time.Millisecond)
-       events.SendPredicatesFailed("alloc-0", "app-0", errors, resource)
+       events.SendPredicatesFailed("alloc-1", "app-0", errors, resource)
        assert.Equal(t, 1, len(eventSystem.Events))
        event = eventSystem.Events[0]
-       assert.Equal(t, "Unschedulable request 'alloc-0': error#0 (2x); error#1 
(123x); error#2 (44x); ", event.Message)
+       assert.Equal(t, "Unschedulable request 'alloc-1': error#0 (2x); error#1 
(123x); error#2 (44x); ", event.Message)
+}
+
+func TestRequiredNodePreemptionFailedEvents(t *testing.T) {
+       resource := 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 1})
+       eventSystem := mock.NewEventSystemDisabled()
+       events := NewAskEvents(eventSystem)
+       events.SendRequiredNodePreemptionFailed("alloc-0", "app-0", nodeID1, 
resource)
+       assert.Equal(t, 0, len(eventSystem.Events))
+
+       eventSystem = mock.NewEventSystem()
+       events = newAskEventsWithRate(eventSystem, 50*time.Millisecond, 1)
+       // only the first event is expected to be emitted due to rate limiting
+       for i := 0; i < 200; i++ {
+               events.SendRequiredNodePreemptionFailed("alloc-0", "app-0", 
nodeID1, resource)
+       }
+       assert.Equal(t, 1, len(eventSystem.Events))
+       event := eventSystem.Events[0]
+       assert.Equal(t, "Unschedulable request 'alloc-0' with required node 
'node-1', no preemption victim found", event.Message)
+       assert.Equal(t, si.EventRecord_REQUEST, event.Type)
+       assert.Equal(t, si.EventRecord_DETAILS_NONE, event.EventChangeDetail)
+       assert.Equal(t, "alloc-0", event.ObjectID)
+       assert.Equal(t, "app-0", event.ReferenceID)
+       protoRes := resources.NewResourceFromProto(event.Resource)
+       assert.DeepEqual(t, resource, protoRes)
+
+       eventSystem.Reset()
+       // wait a bit, a new event is expected
+       time.Sleep(100 * time.Millisecond)
+       events.SendRequiredNodePreemptionFailed("alloc-1", "app-0", nodeID1, 
resource)
+       assert.Equal(t, 1, len(eventSystem.Events))
+       event = eventSystem.Events[0]
+       assert.Equal(t, "Unschedulable request 'alloc-1' with required node 
'node-1', no preemption victim found", event.Message)
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to