This is an automated email from the ASF dual-hosted git repository.
pbacsko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git
The following commit(s) were added to refs/heads/master by this push:
new 0792325c [YUNIKORN-2766] Only generate event if all predicates failed
(#919)
0792325c is described below
commit 0792325c9cb88ee146831fa781f5142b8a951b8a
Author: Peter Bacsko <[email protected]>
AuthorDate: Wed Jul 31 09:40:11 2024 +0200
[YUNIKORN-2766] Only generate event if all predicates failed (#919)
Closes: #919
Signed-off-by: Peter Bacsko <[email protected]>
---
pkg/scheduler/objects/allocation.go | 7 ++--
pkg/scheduler/objects/allocation_test.go | 8 ++--
pkg/scheduler/objects/application.go | 42 ++++++++++++++-------
pkg/scheduler/objects/application_test.go | 49 +++++++++++++++++++++++++
pkg/scheduler/objects/events/ask_events.go | 19 +++++++++-
pkg/scheduler/objects/events/ask_events_test.go | 16 +++++---
pkg/scheduler/objects/node.go | 11 +++---
pkg/scheduler/objects/node_test.go | 19 +++-------
8 files changed, 124 insertions(+), 47 deletions(-)
diff --git a/pkg/scheduler/objects/allocation.go
b/pkg/scheduler/objects/allocation.go
index 6baf925e..f75039bc 100644
--- a/pkg/scheduler/objects/allocation.go
+++ b/pkg/scheduler/objects/allocation.go
@@ -489,9 +489,10 @@ func (a *Allocation) LogAllocationFailure(message string,
allocate bool) {
entry.Count++
}
-// SendPredicateFailedEvent updates the event system with the reason for a
predicate failure.
-func (a *Allocation) SendPredicateFailedEvent(message string) {
- a.askEvents.SendPredicateFailed(a.allocationKey, a.applicationID,
message, a.GetAllocatedResource())
+// SendPredicatesFailedEvent updates the event system with the reason for
predicate failures.
+// The map predicateErrors contains how many times certain predicates failed
in the scheduling cycle for this ask.
+func (a *Allocation) SendPredicatesFailedEvent(predicateErrors map[string]int)
{
+ a.askEvents.SendPredicatesFailed(a.allocationKey, a.applicationID,
predicateErrors, a.GetAllocatedResource())
}
// GetAllocationLog returns a list of log entries corresponding to allocation
preconditions not being met.
diff --git a/pkg/scheduler/objects/allocation_test.go
b/pkg/scheduler/objects/allocation_test.go
index fd513d57..3fe805c9 100644
--- a/pkg/scheduler/objects/allocation_test.go
+++ b/pkg/scheduler/objects/allocation_test.go
@@ -210,15 +210,17 @@ func TestSendPredicateFailed(t *testing.T) {
ask := NewAllocationAskFromSI(siAsk)
eventSystem := mock.NewEventSystemDisabled()
ask.askEvents = schedEvt.NewAskEvents(eventSystem)
- ask.SendPredicateFailedEvent("failed")
+ ask.SendPredicatesFailedEvent(map[string]int{})
assert.Equal(t, 0, len(eventSystem.Events))
eventSystem = mock.NewEventSystem()
ask.askEvents = schedEvt.NewAskEvents(eventSystem)
- ask.SendPredicateFailedEvent("failure")
+ ask.SendPredicatesFailedEvent(map[string]int{
+ "failure": 1,
+ })
assert.Equal(t, 1, len(eventSystem.Events))
event := eventSystem.Events[0]
- assert.Equal(t, "Predicate failed for request 'ask-1' with message:
'failure'", event.Message)
+ assert.Equal(t, "Unschedulable request 'ask-1': failure (1x); ",
event.Message)
}
func TestCreateTime(t *testing.T) {
diff --git a/pkg/scheduler/objects/application.go
b/pkg/scheduler/objects/application.go
index b5a5bf06..563db8f4 100644
--- a/pkg/scheduler/objects/application.go
+++ b/pkg/scheduler/objects/application.go
@@ -1013,7 +1013,8 @@ func (sa *Application) tryAllocate(headRoom
*resources.Resource, allowPreemption
return nil
}
}
- result := sa.tryNode(node, request)
+ // we don't care about predicate error messages here
+ result, _ := sa.tryNode(node, request) //nolint:errcheck
if result != nil {
// check if the node was reserved and we
allocated after a release
if _, ok :=
sa.reservations[reservationKey(node, nil, request)]; ok {
@@ -1157,7 +1158,7 @@ func (sa *Application)
tryPlaceholderAllocate(nodeIterator func() NodeIterator,
node := getNodeFn(ph.GetNodeID())
// got the node run same checks as for reservation (all
but fits)
// resource usage should not change anyway between
placeholder and real one at this point
- if node != nil && node.preReserveConditions(request) {
+ if node != nil && node.preReserveConditions(request) ==
nil {
_, err := sa.allocateAsk(request)
if err != nil {
log.Log(log.SchedApplication).Warn("allocation of ask failed unexpectedly",
@@ -1198,7 +1199,7 @@ func (sa *Application)
tryPlaceholderAllocate(nodeIterator func() NodeIterator,
return true
}
// skip the node if conditions can not be satisfied
- if !node.preAllocateConditions(reqFit) {
+ if err := node.preAllocateConditions(reqFit); err !=
nil {
return true
}
// update just the node to make sure we keep its spot
@@ -1285,7 +1286,8 @@ func (sa *Application) tryReservedAllocate(headRoom
*resources.Resource, nodeIte
}
}
// check allocation possibility
- result := sa.tryNode(reserve.node, ask)
+ // we don't care about predicate error messages here
+ result, _ := sa.tryNode(reserve.node, ask) //nolint:errcheck
// allocation worked fix the resultType and return
if result != nil {
@@ -1379,7 +1381,8 @@ func (sa *Application) tryNodesNoReserve(ask *Allocation,
iterator NodeIterator,
if !node.FitInNode(ask.GetAllocatedResource()) || node.NodeID
== reservedNode {
return true
}
- result := sa.tryNode(node, ask)
+ // we don't care about predicate error messages here
+ result, _ := sa.tryNode(node, ask) //nolint:errcheck
// allocation worked: update resultType and return
if result != nil {
result.ResultType = AllocatedReserved
@@ -1404,6 +1407,7 @@ func (sa *Application) tryNodes(ask *Allocation, iterator
NodeIterator) *Allocat
reservedAsks := sa.GetAskReservations(allocKey)
allowReserve := !ask.IsAllocated() && len(reservedAsks) == 0
var allocResult *AllocationResult
+ var predicateErrors map[string]int
iterator.ForEachNode(func(node *Node) bool {
// skip the node if the node is not valid for the ask
if !node.IsSchedulable() {
@@ -1417,7 +1421,13 @@ func (sa *Application) tryNodes(ask *Allocation,
iterator NodeIterator) *Allocat
return true
}
tryNodeStart := time.Now()
- result := sa.tryNode(node, ask)
+ result, err := sa.tryNode(node, ask)
+ if err != nil {
+ if predicateErrors == nil {
+ predicateErrors = make(map[string]int)
+ }
+ predicateErrors[err.Error()]++
+ }
// allocation worked so return
if result != nil {
metrics.GetSchedulerMetrics().ObserveTryNodeLatency(tryNodeStart)
@@ -1472,6 +1482,10 @@ func (sa *Application) tryNodes(ask *Allocation,
iterator NodeIterator) *Allocat
return allocResult
}
+ if predicateErrors != nil {
+ ask.SendPredicatesFailedEvent(predicateErrors)
+ }
+
// we have not allocated yet, check if we should reserve
// NOTE: the node should not be reserved as the iterator filters them
but we do not lock the nodes
if nodeToReserve != nil && !nodeToReserve.IsReserved() {
@@ -1481,7 +1495,7 @@ func (sa *Application) tryNodes(ask *Allocation, iterator
NodeIterator) *Allocat
zap.String("allocationKey", allocKey),
zap.Int("reservations", len(reservedAsks)))
// skip the node if conditions can not be satisfied
- if !nodeToReserve.preReserveConditions(ask) {
+ if nodeToReserve.preReserveConditions(ask) != nil {
return nil
}
// return reservation allocation and mark it as a reservation
@@ -1492,16 +1506,16 @@ func (sa *Application) tryNodes(ask *Allocation,
iterator NodeIterator) *Allocat
}
// Try allocating on one specific node
-func (sa *Application) tryNode(node *Node, ask *Allocation) *AllocationResult {
+func (sa *Application) tryNode(node *Node, ask *Allocation)
(*AllocationResult, error) {
toAllocate := ask.GetAllocatedResource()
// create the key for the reservation
if !node.preAllocateCheck(toAllocate, reservationKey(nil, sa, ask)) {
// skip schedule onto node
- return nil
+ return nil, nil
}
// skip the node if conditions can not be satisfied
- if !node.preAllocateConditions(ask) {
- return nil
+ if err := node.preAllocateConditions(ask); err != nil {
+ return nil, err
}
// everything OK really allocate
@@ -1511,7 +1525,7 @@ func (sa *Application) tryNode(node *Node, ask
*Allocation) *AllocationResult {
zap.Error(err))
// revert the node update
node.RemoveAllocation(ask.GetAllocationKey())
- return nil
+ return nil, nil
}
// mark this ask as allocated
_, err := sa.allocateAsk(ask)
@@ -1522,9 +1536,9 @@ func (sa *Application) tryNode(node *Node, ask
*Allocation) *AllocationResult {
// all is OK, last update for the app
result := newAllocatedAllocationResult(node.NodeID, ask)
sa.addAllocationInternal(result.ResultType, ask)
- return result
+ return result, nil
}
- return nil
+ return nil, nil
}
func (sa *Application) GetQueuePath() string {
diff --git a/pkg/scheduler/objects/application_test.go
b/pkg/scheduler/objects/application_test.go
index 3f1d6bb5..a58db862 100644
--- a/pkg/scheduler/objects/application_test.go
+++ b/pkg/scheduler/objects/application_test.go
@@ -33,6 +33,8 @@ import (
"github.com/apache/yunikorn-core/pkg/events"
"github.com/apache/yunikorn-core/pkg/events/mock"
"github.com/apache/yunikorn-core/pkg/handler"
+ mockCommon "github.com/apache/yunikorn-core/pkg/mock"
+ "github.com/apache/yunikorn-core/pkg/plugins"
"github.com/apache/yunikorn-core/pkg/rmproxy"
"github.com/apache/yunikorn-core/pkg/rmproxy/rmevent"
schedEvt "github.com/apache/yunikorn-core/pkg/scheduler/objects/events"
@@ -2668,6 +2670,53 @@ func TestUpdateRunnableStatus(t *testing.T) {
assert.Equal(t, si.EventRecord_APP_CANNOTRUN_QUOTA,
eventSystem.Events[1].EventChangeDetail)
}
+func TestPredicateFailedEvents(t *testing.T) {
+ setupUGM()
+
+ res, err := resources.NewResourceFromConf(map[string]string{"first":
"1"})
+ assert.NilError(t, err)
+ headroom, err :=
resources.NewResourceFromConf(map[string]string{"first": "40"})
+ assert.NilError(t, err)
+ ask := newAllocationAsk("alloc-0", "app-1", res)
+ app := newApplication(appID1, "default", "root")
+ eventSystem := mock.NewEventSystem()
+ ask.askEvents = schedEvt.NewAskEvents(eventSystem)
+ app.disableStateChangeEvents()
+ app.resetAppEvents()
+ queue, err := createRootQueue(nil)
+ assert.NilError(t, err, "queue create failed")
+ app.queue = queue
+ sr := sortedRequests{}
+ sr.insert(ask)
+ app.sortedRequests = sr
+ attempts := 0
+
+ mockPlugin := mockCommon.NewPredicatePlugin(true, nil)
+ plugins.RegisterSchedulerPlugin(mockPlugin)
+ defer plugins.UnregisterSchedulerPlugins()
+
+ app.tryAllocate(headroom, false, time.Second, &attempts, func()
NodeIterator {
+ return &testIterator{}
+ }, nilNodeIterator, nilGetNode)
+ assert.Equal(t, 1, len(eventSystem.Events))
+ event := eventSystem.Events[0]
+ assert.Equal(t, si.EventRecord_REQUEST, event.Type)
+ assert.Equal(t, si.EventRecord_NONE, event.EventChangeType)
+ assert.Equal(t, si.EventRecord_DETAILS_NONE, event.EventChangeDetail)
+ assert.Equal(t, "app-1", event.ReferenceID)
+ assert.Equal(t, "alloc-0", event.ObjectID)
+ assert.Equal(t, "Unschedulable request 'alloc-0': fake predicate plugin
failed (2x); ", event.Message)
+}
+
+type testIterator struct{}
+
+func (testIterator) ForEachNode(fn func(*Node) bool) {
+ node1 := newNode(nodeID1, map[string]resources.Quantity{"first": 20})
+ node2 := newNode(nodeID2, map[string]resources.Quantity{"first": 20})
+ fn(node1)
+ fn(node2)
+}
+
func TestGetMaxResourceFromTag(t *testing.T) {
app := newApplication(appID0, "default", "root.unknown")
testGetResourceFromTag(t, siCommon.AppTagNamespaceResourceQuota,
app.tags, app.GetMaxResource)
diff --git a/pkg/scheduler/objects/events/ask_events.go
b/pkg/scheduler/objects/events/ask_events.go
index 7a7e4095..64265c6d 100644
--- a/pkg/scheduler/objects/events/ask_events.go
+++ b/pkg/scheduler/objects/events/ask_events.go
@@ -20,6 +20,8 @@ package events
import (
"fmt"
+ "sort"
+ "strconv"
"time"
"golang.org/x/time/rate"
@@ -70,11 +72,24 @@ func (ae *AskEvents) SendRequestFitsInUserQuota(allocKey,
appID string, allocate
ae.eventSystem.AddEvent(event)
}
-func (ae *AskEvents) SendPredicateFailed(allocKey, appID, predicateMsg string,
allocatedResource *resources.Resource) {
+func (ae *AskEvents) SendPredicatesFailed(allocKey, appID string,
predicateErrors map[string]int, allocatedResource *resources.Resource) {
if !ae.eventSystem.IsEventTrackingEnabled() || !ae.limiter.Allow() {
return
}
- message := fmt.Sprintf("Predicate failed for request '%s' with message:
'%s'", allocKey, predicateMsg)
+
+ messages := make([]string, 0, len(predicateErrors))
+ for k := range predicateErrors {
+ messages = append(messages, k)
+ }
+ sort.Strings(messages) // make sure we always have the same string
regardless of iteration order
+
+ var failures string
+ for _, m := range messages {
+ times := strconv.Itoa(predicateErrors[m])
+ failures = failures + m + " (" + times + "x); " // example:
"node(s) had taints that the pod didn't tolerate (5x);"
+ }
+
+ message := fmt.Sprintf("Unschedulable request '%s': %s", allocKey,
failures)
event := events.CreateRequestEventRecord(allocKey, appID, message,
allocatedResource)
ae.eventSystem.AddEvent(event)
}
diff --git a/pkg/scheduler/objects/events/ask_events_test.go
b/pkg/scheduler/objects/events/ask_events_test.go
index a3a661ef..add2fc53 100644
--- a/pkg/scheduler/objects/events/ask_events_test.go
+++ b/pkg/scheduler/objects/events/ask_events_test.go
@@ -19,7 +19,6 @@
package events
import (
- "strconv"
"testing"
"time"
@@ -117,24 +116,29 @@ func TestPredicateFailedEvents(t *testing.T) {
resource :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 1})
eventSystem := mock.NewEventSystemDisabled()
events := NewAskEvents(eventSystem)
- events.SendPredicateFailed("alloc-0", "app-0", "failed", resource)
+ events.SendPredicatesFailed("alloc-0", "app-0", map[string]int{},
resource)
assert.Equal(t, 0, len(eventSystem.Events))
eventSystem = mock.NewEventSystem()
events = newAskEventsWithRate(eventSystem, 50*time.Millisecond, 1)
+ errors := map[string]int{
+ "error#0": 2,
+ "error#1": 123,
+ "error#2": 44,
+ }
// only the first event is expected to be emitted due to rate limiting
for i := 0; i < 200; i++ {
- events.SendPredicateFailed("alloc-0", "app-0",
"failure-"+strconv.FormatUint(uint64(i), 10), resource)
+ events.SendPredicatesFailed("alloc-0", "app-0", errors,
resource)
}
assert.Equal(t, 1, len(eventSystem.Events))
event := eventSystem.Events[0]
- assert.Equal(t, "Predicate failed for request 'alloc-0' with message:
'failure-0'", event.Message)
+ assert.Equal(t, "Unschedulable request 'alloc-0': error#0 (2x); error#1
(123x); error#2 (44x); ", event.Message)
eventSystem.Reset()
// wait a bit, a new event is expected
time.Sleep(100 * time.Millisecond)
- events.SendPredicateFailed("alloc-0", "app-0", "failed", resource)
+ events.SendPredicatesFailed("alloc-0", "app-0", errors, resource)
assert.Equal(t, 1, len(eventSystem.Events))
event = eventSystem.Events[0]
- assert.Equal(t, "Predicate failed for request 'alloc-0' with message:
'failed'", event.Message)
+ assert.Equal(t, "Unschedulable request 'alloc-0': error#0 (2x); error#1
(123x); error#2 (44x); ", event.Message)
}
diff --git a/pkg/scheduler/objects/node.go b/pkg/scheduler/objects/node.go
index 008d358a..196e6995 100644
--- a/pkg/scheduler/objects/node.go
+++ b/pkg/scheduler/objects/node.go
@@ -367,12 +367,12 @@ func (sn *Node) CanAllocate(res *resources.Resource) bool
{
}
// Checking pre-conditions in the shim for an allocation.
-func (sn *Node) preAllocateConditions(ask *Allocation) bool {
+func (sn *Node) preAllocateConditions(ask *Allocation) error {
return sn.preConditions(ask, true)
}
// Checking pre-conditions in the shim for a reservation.
-func (sn *Node) preReserveConditions(ask *Allocation) bool {
+func (sn *Node) preReserveConditions(ask *Allocation) error {
return sn.preConditions(ask, false)
}
@@ -382,7 +382,7 @@ func (sn *Node) preReserveConditions(ask *Allocation) bool {
// The caller must thus not rely on all plugins being executed.
// This is a lock free call as it does not change the node and multiple
predicate checks could be
// run at the same time.
-func (sn *Node) preConditions(ask *Allocation, allocate bool) bool {
+func (sn *Node) preConditions(ask *Allocation, allocate bool) error {
// Check the predicates plugin (k8shim)
allocationKey := ask.GetAllocationKey()
if plugin := plugins.GetResourceManagerCallbackPlugin(); plugin != nil {
@@ -400,12 +400,11 @@ func (sn *Node) preConditions(ask *Allocation, allocate
bool) bool {
// running predicates failed
msg := err.Error()
ask.LogAllocationFailure(msg, allocate)
- ask.SendPredicateFailedEvent(msg)
- return false
+ return err
}
}
// all predicate plugins passed
- return true
+ return nil
}
// preAllocateCheck checks if the node should be considered as a possible node
to allocate on.
diff --git a/pkg/scheduler/objects/node_test.go
b/pkg/scheduler/objects/node_test.go
index 274fb8a0..b09a5d67 100644
--- a/pkg/scheduler/objects/node_test.go
+++ b/pkg/scheduler/objects/node_test.go
@@ -104,7 +104,7 @@ func TestCheckConditions(t *testing.T) {
// Check if we can allocate on scheduling node (no plugins)
res :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 1})
ask := newAllocationAsk("test", "app001", res)
- if !node.preAllocateConditions(ask) {
+ if node.preAllocateConditions(ask) != nil {
t.Error("node with scheduling set to true no plugins should
allow allocation")
}
}
@@ -770,10 +770,7 @@ func TestNode_FitInNode(t *testing.T) {
}
func TestPreconditions(t *testing.T) {
- current := plugins.GetResourceManagerCallbackPlugin()
- defer func() {
- plugins.RegisterSchedulerPlugin(current)
- }()
+ defer plugins.UnregisterSchedulerPlugins()
plugins.RegisterSchedulerPlugin(mock.NewPredicatePlugin(true,
map[string]int{}))
total :=
resources.NewResourceFromMap(map[string]resources.Quantity{"cpu": 100,
"memory": 100})
@@ -783,21 +780,17 @@ func TestPreconditions(t *testing.T) {
})
res :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 1})
ask := newAllocationAsk("test", "app001", res)
- eventSystem := evtMock.NewEventSystem()
- ask.askEvents = schedEvt.NewAskEvents(eventSystem)
node := NewNode(proto)
// failure
- node.preConditions(ask, true)
- assert.Equal(t, 1, len(eventSystem.Events))
- assert.Equal(t, "Predicate failed for request 'test' with message:
'fake predicate plugin failed'", eventSystem.Events[0].Message)
+ err := node.preConditions(ask, true)
+ assert.ErrorContains(t, err, "fake predicate plugin failed")
assert.Equal(t, 1, len(ask.allocLog))
assert.Equal(t, "fake predicate plugin failed", ask.allocLog["fake
predicate plugin failed"].Message)
// pass
- eventSystem.Reset()
plugins.RegisterSchedulerPlugin(mock.NewPredicatePlugin(false,
map[string]int{}))
- node.preConditions(ask, true)
- assert.Equal(t, 0, len(eventSystem.Events))
+ err = node.preConditions(ask, true)
+ assert.NilError(t, err)
assert.Equal(t, 1, len(ask.allocLog))
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]