This is an automated email from the ASF dual-hosted git repository.
ccondit pushed a commit to branch branch-1.6
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git
The following commit(s) were added to refs/heads/branch-1.6 by this push:
new 6319824d [YUNIKORN-2980] DaemonSet preemption: Don't flood the logs if
victim selection fails (#998)
6319824d is described below
commit 6319824de1426db4f0bb7cc28a5df35293b25894
Author: Peter Bacsko <[email protected]>
AuthorDate: Wed Nov 20 17:17:16 2024 -0600
[YUNIKORN-2980] DaemonSet preemption: Don't flood the logs if victim
selection fails (#998)
Closes: #998
Signed-off-by: Craig Condit <[email protected]>
---
pkg/common/errors.go | 1 +
pkg/scheduler/objects/allocation.go | 5 +
pkg/scheduler/objects/application.go | 8 +-
pkg/scheduler/objects/application_test.go | 175 ++++++++++++++++++++++++
pkg/scheduler/objects/events/ask_events.go | 22 ++-
pkg/scheduler/objects/events/ask_events_test.go | 40 +++++-
6 files changed, 238 insertions(+), 13 deletions(-)
diff --git a/pkg/common/errors.go b/pkg/common/errors.go
index 1e8334ea..f73c84b4 100644
--- a/pkg/common/errors.go
+++ b/pkg/common/errors.go
@@ -27,3 +27,4 @@ const PreemptionPreconditionsFailed = "Preemption
preconditions failed"
const PreemptionDoesNotGuarantee = "Preemption queue guarantees check failed"
const PreemptionShortfall = "Preemption helped but short of resources"
const PreemptionDoesNotHelp = "Preemption does not help"
+const NoVictimForRequiredNode = "No fit on required node, preemption does not
help"
diff --git a/pkg/scheduler/objects/allocation.go
b/pkg/scheduler/objects/allocation.go
index 8f2450e9..3e1c1562 100644
--- a/pkg/scheduler/objects/allocation.go
+++ b/pkg/scheduler/objects/allocation.go
@@ -457,6 +457,11 @@ func (a *Allocation)
SendPredicatesFailedEvent(predicateErrors map[string]int) {
a.askEvents.SendPredicatesFailed(a.allocationKey, a.applicationID,
predicateErrors, a.GetAllocatedResource())
}
+// SendRequiredNodePreemptionFailedEvent updates the event system with
required node preemption failed event.
+func (a *Allocation) SendRequiredNodePreemptionFailedEvent(node string) {
+ a.askEvents.SendRequiredNodePreemptionFailed(a.allocationKey,
a.applicationID, node, a.GetAllocatedResource())
+}
+
// GetAllocationLog returns a list of log entries corresponding to allocation
preconditions not being met.
func (a *Allocation) GetAllocationLog() []*AllocationLogEntry {
a.RLock()
diff --git a/pkg/scheduler/objects/application.go
b/pkg/scheduler/objects/application.go
index 0fe682aa..1347aa20 100644
--- a/pkg/scheduler/objects/application.go
+++ b/pkg/scheduler/objects/application.go
@@ -1406,9 +1406,6 @@ func (sa *Application) tryPreemption(headRoom
*resources.Resource, preemptionDel
}
func (sa *Application) tryRequiredNodePreemption(reserve *reservation, ask
*Allocation) bool {
- log.Log(log.SchedApplication).Info("Triggering preemption process for
daemon set ask",
- zap.String("ds allocation key", ask.GetAllocationKey()))
-
// try preemption and see if we can free up resource
preemptor := NewRequiredNodePreemptor(reserve.node, ask)
preemptor.filterAllocations()
@@ -1431,9 +1428,8 @@ func (sa *Application) tryRequiredNodePreemption(reserve
*reservation, ask *Allo
"preempting allocations to free up resources to run
daemon set ask: "+ask.GetAllocationKey())
return true
}
- log.Log(log.SchedApplication).Warn("Problem in finding the victims for
preempting resources to meet required ask requirements",
- zap.String("ds allocation key", ask.GetAllocationKey()),
- zap.String("node id", reserve.nodeID))
+ ask.LogAllocationFailure(common.NoVictimForRequiredNode, true)
+ ask.SendRequiredNodePreemptionFailedEvent(reserve.node.NodeID)
return false
}
diff --git a/pkg/scheduler/objects/application_test.go
b/pkg/scheduler/objects/application_test.go
index 91db298f..dc1abb31 100644
--- a/pkg/scheduler/objects/application_test.go
+++ b/pkg/scheduler/objects/application_test.go
@@ -21,6 +21,7 @@ package objects
import (
"fmt"
"math"
+ "strings"
"testing"
"time"
@@ -2900,6 +2901,172 @@ func TestPredicateFailedEvents(t *testing.T) {
assert.Equal(t, "Unschedulable request 'alloc-0': fake predicate plugin
failed (2x); ", event.Message)
}
+func TestRequiredNodePreemption(t *testing.T) {
+ // tests successful RequiredNode (DaemonSet) preemption
+ app := newApplication(appID0, "default", "root.default")
+ var releaseEvents []*rmevent.RMReleaseAllocationEvent
+ app.rmEventHandler = &mockAppEventHandler{
+ callback: func(ev interface{}) {
+ if rmEvent, ok :=
ev.(*rmevent.RMReleaseAllocationEvent); ok {
+ releaseEvents = append(releaseEvents, rmEvent)
+ go func() {
+ rmEvent.Channel <- &rmevent.Result{
+ Succeeded: true,
+ }
+ }()
+ }
+ },
+ }
+ node := newNode(nodeID1, map[string]resources.Quantity{"first": 20})
+ node.nodeEvents = schedEvt.NewNodeEvents(mock.NewEventSystemDisabled())
+ iterator := getNodeIteratorFn(node)
+ getNode := func(nodeID string) *Node {
+ return node
+ }
+
+ // set queue
+ rootQ, err := createRootQueue(map[string]string{"first": "20"})
+ assert.NilError(t, err)
+ childQ, err := createManagedQueue(rootQ, "default", false,
map[string]string{"first": "20"})
+ assert.NilError(t, err)
+ app.SetQueue(childQ)
+
+ // add an ask
+ mockEvents := mock.NewEventSystem()
+ askRes :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 15})
+ ask1 := newAllocationAsk("ask-1", "app-1", askRes)
+ ask1.askEvents = schedEvt.NewAskEvents(mockEvents)
+ err = app.AddAllocationAsk(ask1)
+ assert.NilError(t, err, "could not add ask-1")
+ preemptionAttemptsRemaining := 0
+
+ // allocate ask
+ headRoom :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 50})
+ result := app.tryAllocate(headRoom, true, 30*time.Second,
&preemptionAttemptsRemaining, iterator, iterator, getNode)
+ assert.Equal(t, result.ResultType, Allocated, "could not allocate
ask-1")
+ assert.Equal(t, result.Request.allocationKey, "ask-1", "unexpected
allocation key")
+
+ // add ask2 with required node
+ ask2 := newAllocationAsk("ask-2", "app-1", askRes)
+ ask2.askEvents = schedEvt.NewAskEvents(mockEvents)
+ ask2.requiredNode = nodeID1
+ err = app.AddAllocationAsk(ask2)
+ assert.NilError(t, err, "could not add ask-2")
+
+ // try to allocate ask2 with node being full - expect a reservation
+ result = app.tryAllocate(headRoom, true, 30*time.Second,
&preemptionAttemptsRemaining, iterator, iterator, getNode)
+ assert.Equal(t, result.ResultType, Reserved, "allocation result is not
reserved")
+ assert.Equal(t, result.Request.allocationKey, "ask-2", "unexpected
allocation key")
+ err = app.Reserve(node, ask2)
+ assert.NilError(t, err, "reservation failed")
+
+ // preemption
+ assert.Assert(t, app.tryReservedAllocate(headRoom, iterator) == nil,
"unexpected result from reserved allocation")
+ assert.Assert(t, ask1.IsPreempted(), "ask1 has not been preempted")
+ assert.Assert(t, ask2.HasTriggeredPreemption(), "ask2 has not triggered
preemption")
+ assert.Equal(t, 1, len(releaseEvents), "unexpected number of release
events")
+ assert.Equal(t, 1, len(releaseEvents[0].ReleasedAllocations),
"unexpected number of release allocations")
+ assert.Equal(t, "ask-1",
releaseEvents[0].ReleasedAllocations[0].AllocationKey, "allocation key")
+
+ // 2nd attempt - no preemption this time
+ releaseEvents = nil
+ assert.Assert(t, app.tryReservedAllocate(headRoom, iterator) == nil,
"unexpected result from reserved allocation")
+ assert.Assert(t, releaseEvents == nil, "unexpected release event")
+
+ // check for preemption related events
+ for _, event := range mockEvents.Events {
+ assert.Assert(t,
!strings.Contains(strings.ToLower(event.Message), "preemption"), "received a
preemption related event")
+ }
+}
+
+func TestRequiredNodePreemptionFailed(t *testing.T) {
+ // tests RequiredNode (DaemonSet) preemption where the victim pod has a
high priority, hence preemption is not possible
+ app := newApplication(appID0, "default", "root.default")
+ var releaseEvents []*rmevent.RMReleaseAllocationEvent
+ app.rmEventHandler = &mockAppEventHandler{
+ callback: func(ev interface{}) {
+ if rmEvent, ok :=
ev.(*rmevent.RMReleaseAllocationEvent); ok {
+ releaseEvents = append(releaseEvents, rmEvent)
+ go func() {
+ rmEvent.Channel <- &rmevent.Result{
+ Succeeded: true,
+ }
+ }()
+ }
+ },
+ }
+ node := newNode(nodeID1, map[string]resources.Quantity{"first": 20})
+ node.nodeEvents = schedEvt.NewNodeEvents(mock.NewEventSystemDisabled())
+ iterator := getNodeIteratorFn(node)
+ getNode := func(nodeID string) *Node {
+ return node
+ }
+
+ // set queue
+ rootQ, err := createRootQueue(map[string]string{"first": "20"})
+ assert.NilError(t, err)
+ childQ, err := createManagedQueue(rootQ, "default", false,
map[string]string{"first": "20"})
+ assert.NilError(t, err)
+ app.SetQueue(childQ)
+
+ // add an ask with high priority
+ mockEvents := mock.NewEventSystem()
+ askRes :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 15})
+ ask1 := newAllocationAsk("ask-1", "app-1", askRes)
+ ask1.askEvents = schedEvt.NewAskEvents(mockEvents)
+ ask1.priority = 1000
+ err = app.AddAllocationAsk(ask1)
+ assert.NilError(t, err, "could not add ask-1")
+ preemptionAttemptsRemaining := 0
+
+ // allocate ask
+ headRoom :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 50})
+ result := app.tryAllocate(headRoom, true, 30*time.Second,
&preemptionAttemptsRemaining, iterator, iterator, getNode)
+ assert.Equal(t, result.ResultType, Allocated, "could not allocate
ask-1")
+ assert.Equal(t, result.Request.allocationKey, "ask-1", "unexpected
allocation key")
+
+ // add ask2 with required node
+ ask2 := newAllocationAsk("ask-2", "app-1", askRes)
+ ask2.askEvents = schedEvt.NewAskEvents(mockEvents)
+ ask2.requiredNode = nodeID1
+ err = app.AddAllocationAsk(ask2)
+ assert.NilError(t, err, "could not add ask-2")
+
+ // try to allocate ask2 with node being full - expect a reservation
+ result = app.tryAllocate(headRoom, true, 30*time.Second,
&preemptionAttemptsRemaining, iterator, iterator, getNode)
+ assert.Equal(t, result.ResultType, Reserved, "allocation result is not
reserved")
+ assert.Equal(t, result.Request.allocationKey, "ask-2", "unexpected
allocation key")
+ err = app.Reserve(node, ask2)
+ assert.NilError(t, err, "reservation failed")
+
+ // try preemption - should not succeed
+ assert.Assert(t, app.tryReservedAllocate(headRoom, iterator) == nil,
"unexpected result from reserved allocation")
+ assert.Assert(t, !ask1.IsPreempted(), "unexpected preemption of ask1")
+ assert.Assert(t, !ask2.HasTriggeredPreemption(), "unexpected preemption
triggered from ask2")
+ assert.Equal(t, 0, len(releaseEvents), "unexpected number of release
events")
+ // check for events
+ noEvents := 0
+ var requestEvt *si.EventRecord
+ for _, event := range mockEvents.Events {
+ if event.Type == si.EventRecord_REQUEST &&
strings.Contains(strings.ToLower(event.Message), "preemption") {
+ noEvents++
+ requestEvt = event
+ }
+ }
+ assert.Equal(t, 1, noEvents, "unexpected number of REQUEST events")
+ assert.Equal(t, "Unschedulable request 'ask-2' with required node
'node-1', no preemption victim found", requestEvt.Message)
+ assert.Equal(t, 1, len(ask2.allocLog), "unexpected number of entries in
the allocation log")
+ assert.Equal(t, int32(1),
ask2.allocLog[common.NoVictimForRequiredNode].Count, "incorrect number of entry
count")
+ assert.Equal(t, common.NoVictimForRequiredNode,
ask2.allocLog[common.NoVictimForRequiredNode].Message, "unexpected log message")
+
+ // check counting & event throttling
+ assert.Assert(t, app.tryReservedAllocate(headRoom, iterator) == nil,
"unexpected result from reserved allocation")
+ assert.Assert(t, app.tryReservedAllocate(headRoom, iterator) == nil,
"unexpected result from reserved allocation")
+ assert.Assert(t, app.tryReservedAllocate(headRoom, iterator) == nil,
"unexpected result from reserved allocation")
+ assert.Equal(t, 1, noEvents, "unexpected number of REQUEST events")
+ assert.Equal(t, int32(4),
ask2.allocLog[common.NoVictimForRequiredNode].Count, "incorrect number of entry
count")
+}
+
type testIterator struct{}
func (testIterator) ForEachNode(fn func(*Node) bool) {
@@ -3019,3 +3186,11 @@ func TestGetUint64Tag(t *testing.T) {
})
}
}
+
+type mockAppEventHandler struct {
+ callback func(ev interface{})
+}
+
+func (m mockAppEventHandler) HandleEvent(ev interface{}) {
+ m.callback(ev)
+}
diff --git a/pkg/scheduler/objects/events/ask_events.go
b/pkg/scheduler/objects/events/ask_events.go
index 64265c6d..0e10a253 100644
--- a/pkg/scheduler/objects/events/ask_events.go
+++ b/pkg/scheduler/objects/events/ask_events.go
@@ -32,8 +32,9 @@ import (
// AskEvents Request-specific events. These events are of REQUEST type, so
they are eventually sent to the respective pods in K8s.
type AskEvents struct {
- eventSystem events.EventSystem
- limiter *rate.Limiter
+ eventSystem events.EventSystem
+ predicateLimiter *rate.Limiter
+ reqNodeLimiter *rate.Limiter
}
func (ae *AskEvents) SendRequestExceedsQueueHeadroom(allocKey, appID string,
headroom, allocatedResource *resources.Resource, queuePath string) {
@@ -73,7 +74,7 @@ func (ae *AskEvents) SendRequestFitsInUserQuota(allocKey,
appID string, allocate
}
func (ae *AskEvents) SendPredicatesFailed(allocKey, appID string,
predicateErrors map[string]int, allocatedResource *resources.Resource) {
- if !ae.eventSystem.IsEventTrackingEnabled() || !ae.limiter.Allow() {
+ if !ae.eventSystem.IsEventTrackingEnabled() ||
!ae.predicateLimiter.Allow() {
return
}
@@ -94,13 +95,24 @@ func (ae *AskEvents) SendPredicatesFailed(allocKey, appID
string, predicateError
ae.eventSystem.AddEvent(event)
}
+func (ae *AskEvents) SendRequiredNodePreemptionFailed(allocKey, appID, node
string, allocatedResource *resources.Resource) {
+ if !ae.eventSystem.IsEventTrackingEnabled() ||
!ae.reqNodeLimiter.Allow() {
+ return
+ }
+
+ message := fmt.Sprintf("Unschedulable request '%s' with required node
'%s', no preemption victim found", allocKey, node)
+ event := events.CreateRequestEventRecord(allocKey, appID, message,
allocatedResource)
+ ae.eventSystem.AddEvent(event)
+}
+
func NewAskEvents(evt events.EventSystem) *AskEvents {
return newAskEventsWithRate(evt, 15*time.Second, 1)
}
func newAskEventsWithRate(evt events.EventSystem, interval time.Duration,
burst int) *AskEvents {
return &AskEvents{
- eventSystem: evt,
- limiter: rate.NewLimiter(rate.Every(interval), burst),
+ eventSystem: evt,
+ predicateLimiter: rate.NewLimiter(rate.Every(interval), burst),
+ reqNodeLimiter: rate.NewLimiter(rate.Every(interval), burst),
}
}
diff --git a/pkg/scheduler/objects/events/ask_events_test.go
b/pkg/scheduler/objects/events/ask_events_test.go
index add2fc53..8f6f8558 100644
--- a/pkg/scheduler/objects/events/ask_events_test.go
+++ b/pkg/scheduler/objects/events/ask_events_test.go
@@ -133,12 +133,48 @@ func TestPredicateFailedEvents(t *testing.T) {
assert.Equal(t, 1, len(eventSystem.Events))
event := eventSystem.Events[0]
assert.Equal(t, "Unschedulable request 'alloc-0': error#0 (2x); error#1
(123x); error#2 (44x); ", event.Message)
+ assert.Equal(t, si.EventRecord_REQUEST, event.Type)
+ assert.Equal(t, si.EventRecord_DETAILS_NONE, event.EventChangeDetail)
+ assert.Equal(t, "alloc-0", event.ObjectID)
+ assert.Equal(t, "app-0", event.ReferenceID)
eventSystem.Reset()
// wait a bit, a new event is expected
time.Sleep(100 * time.Millisecond)
- events.SendPredicatesFailed("alloc-0", "app-0", errors, resource)
+ events.SendPredicatesFailed("alloc-1", "app-0", errors, resource)
assert.Equal(t, 1, len(eventSystem.Events))
event = eventSystem.Events[0]
- assert.Equal(t, "Unschedulable request 'alloc-0': error#0 (2x); error#1
(123x); error#2 (44x); ", event.Message)
+ assert.Equal(t, "Unschedulable request 'alloc-1': error#0 (2x); error#1
(123x); error#2 (44x); ", event.Message)
+}
+
+func TestRequiredNodePreemptionFailedEvents(t *testing.T) {
+ resource :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 1})
+ eventSystem := mock.NewEventSystemDisabled()
+ events := NewAskEvents(eventSystem)
+ events.SendRequiredNodePreemptionFailed("alloc-0", "app-0", nodeID1,
resource)
+ assert.Equal(t, 0, len(eventSystem.Events))
+
+ eventSystem = mock.NewEventSystem()
+ events = newAskEventsWithRate(eventSystem, 50*time.Millisecond, 1)
+ // only the first event is expected to be emitted due to rate limiting
+ for i := 0; i < 200; i++ {
+ events.SendRequiredNodePreemptionFailed("alloc-0", "app-0",
nodeID1, resource)
+ }
+ assert.Equal(t, 1, len(eventSystem.Events))
+ event := eventSystem.Events[0]
+ assert.Equal(t, "Unschedulable request 'alloc-0' with required node
'node-1', no preemption victim found", event.Message)
+ assert.Equal(t, si.EventRecord_REQUEST, event.Type)
+ assert.Equal(t, si.EventRecord_DETAILS_NONE, event.EventChangeDetail)
+ assert.Equal(t, "alloc-0", event.ObjectID)
+ assert.Equal(t, "app-0", event.ReferenceID)
+ protoRes := resources.NewResourceFromProto(event.Resource)
+ assert.DeepEqual(t, resource, protoRes)
+
+ eventSystem.Reset()
+ // wait a bit, a new event is expected
+ time.Sleep(100 * time.Millisecond)
+ events.SendRequiredNodePreemptionFailed("alloc-1", "app-0", nodeID1,
resource)
+ assert.Equal(t, 1, len(eventSystem.Events))
+ event = eventSystem.Events[0]
+ assert.Equal(t, "Unschedulable request 'alloc-1' with required node
'node-1', no preemption victim found", event.Message)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]