This is an automated email from the ASF dual-hosted git repository.

pbacsko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git


The following commit(s) were added to refs/heads/master by this push:
     new 0792325c [YUNIKORN-2766] Only generate event if all predicates failed 
(#919)
0792325c is described below

commit 0792325c9cb88ee146831fa781f5142b8a951b8a
Author: Peter Bacsko <[email protected]>
AuthorDate: Wed Jul 31 09:40:11 2024 +0200

    [YUNIKORN-2766] Only generate event if all predicates failed (#919)
    
    Closes: #919
    
    Signed-off-by: Peter Bacsko <[email protected]>
---
 pkg/scheduler/objects/allocation.go             |  7 ++--
 pkg/scheduler/objects/allocation_test.go        |  8 ++--
 pkg/scheduler/objects/application.go            | 42 ++++++++++++++-------
 pkg/scheduler/objects/application_test.go       | 49 +++++++++++++++++++++++++
 pkg/scheduler/objects/events/ask_events.go      | 19 +++++++++-
 pkg/scheduler/objects/events/ask_events_test.go | 16 +++++---
 pkg/scheduler/objects/node.go                   | 11 +++---
 pkg/scheduler/objects/node_test.go              | 19 +++-------
 8 files changed, 124 insertions(+), 47 deletions(-)

diff --git a/pkg/scheduler/objects/allocation.go 
b/pkg/scheduler/objects/allocation.go
index 6baf925e..f75039bc 100644
--- a/pkg/scheduler/objects/allocation.go
+++ b/pkg/scheduler/objects/allocation.go
@@ -489,9 +489,10 @@ func (a *Allocation) LogAllocationFailure(message string, 
allocate bool) {
        entry.Count++
 }
 
-// SendPredicateFailedEvent updates the event system with the reason for a 
predicate failure.
-func (a *Allocation) SendPredicateFailedEvent(message string) {
-       a.askEvents.SendPredicateFailed(a.allocationKey, a.applicationID, 
message, a.GetAllocatedResource())
+// SendPredicatesFailedEvent updates the event system with the reason for 
predicate failures.
+// The map predicateErrors contains how many times certain predicates failed 
in the scheduling cycle for this ask.
+func (a *Allocation) SendPredicatesFailedEvent(predicateErrors map[string]int) 
{
+       a.askEvents.SendPredicatesFailed(a.allocationKey, a.applicationID, 
predicateErrors, a.GetAllocatedResource())
 }
 
 // GetAllocationLog returns a list of log entries corresponding to allocation 
preconditions not being met.
diff --git a/pkg/scheduler/objects/allocation_test.go 
b/pkg/scheduler/objects/allocation_test.go
index fd513d57..3fe805c9 100644
--- a/pkg/scheduler/objects/allocation_test.go
+++ b/pkg/scheduler/objects/allocation_test.go
@@ -210,15 +210,17 @@ func TestSendPredicateFailed(t *testing.T) {
        ask := NewAllocationAskFromSI(siAsk)
        eventSystem := mock.NewEventSystemDisabled()
        ask.askEvents = schedEvt.NewAskEvents(eventSystem)
-       ask.SendPredicateFailedEvent("failed")
+       ask.SendPredicatesFailedEvent(map[string]int{})
        assert.Equal(t, 0, len(eventSystem.Events))
 
        eventSystem = mock.NewEventSystem()
        ask.askEvents = schedEvt.NewAskEvents(eventSystem)
-       ask.SendPredicateFailedEvent("failure")
+       ask.SendPredicatesFailedEvent(map[string]int{
+               "failure": 1,
+       })
        assert.Equal(t, 1, len(eventSystem.Events))
        event := eventSystem.Events[0]
-       assert.Equal(t, "Predicate failed for request 'ask-1' with message: 
'failure'", event.Message)
+       assert.Equal(t, "Unschedulable request 'ask-1': failure (1x); ", 
event.Message)
 }
 
 func TestCreateTime(t *testing.T) {
diff --git a/pkg/scheduler/objects/application.go 
b/pkg/scheduler/objects/application.go
index b5a5bf06..563db8f4 100644
--- a/pkg/scheduler/objects/application.go
+++ b/pkg/scheduler/objects/application.go
@@ -1013,7 +1013,8 @@ func (sa *Application) tryAllocate(headRoom 
*resources.Resource, allowPreemption
                                        return nil
                                }
                        }
-                       result := sa.tryNode(node, request)
+                       // we don't care about predicate error messages here
+                       result, _ := sa.tryNode(node, request) //nolint:errcheck
                        if result != nil {
                                // check if the node was reserved and we 
allocated after a release
                                if _, ok := 
sa.reservations[reservationKey(node, nil, request)]; ok {
@@ -1157,7 +1158,7 @@ func (sa *Application) 
tryPlaceholderAllocate(nodeIterator func() NodeIterator,
                        node := getNodeFn(ph.GetNodeID())
                        // got the node run same checks as for reservation (all 
but fits)
                        // resource usage should not change anyway between 
placeholder and real one at this point
-                       if node != nil && node.preReserveConditions(request) {
+                       if node != nil && node.preReserveConditions(request) == 
nil {
                                _, err := sa.allocateAsk(request)
                                if err != nil {
                                        
log.Log(log.SchedApplication).Warn("allocation of ask failed unexpectedly",
@@ -1198,7 +1199,7 @@ func (sa *Application) 
tryPlaceholderAllocate(nodeIterator func() NodeIterator,
                                return true
                        }
                        // skip the node if conditions can not be satisfied
-                       if !node.preAllocateConditions(reqFit) {
+                       if err := node.preAllocateConditions(reqFit); err != 
nil {
                                return true
                        }
                        // update just the node to make sure we keep its spot
@@ -1285,7 +1286,8 @@ func (sa *Application) tryReservedAllocate(headRoom 
*resources.Resource, nodeIte
                        }
                }
                // check allocation possibility
-               result := sa.tryNode(reserve.node, ask)
+               // we don't care about predicate error messages here
+               result, _ := sa.tryNode(reserve.node, ask) //nolint:errcheck
 
                // allocation worked fix the resultType and return
                if result != nil {
@@ -1379,7 +1381,8 @@ func (sa *Application) tryNodesNoReserve(ask *Allocation, 
iterator NodeIterator,
                if !node.FitInNode(ask.GetAllocatedResource()) || node.NodeID 
== reservedNode {
                        return true
                }
-               result := sa.tryNode(node, ask)
+               // we don't care about predicate error messages here
+               result, _ := sa.tryNode(node, ask) //nolint:errcheck
                // allocation worked: update resultType and return
                if result != nil {
                        result.ResultType = AllocatedReserved
@@ -1404,6 +1407,7 @@ func (sa *Application) tryNodes(ask *Allocation, iterator 
NodeIterator) *Allocat
        reservedAsks := sa.GetAskReservations(allocKey)
        allowReserve := !ask.IsAllocated() && len(reservedAsks) == 0
        var allocResult *AllocationResult
+       var predicateErrors map[string]int
        iterator.ForEachNode(func(node *Node) bool {
                // skip the node if the node is not valid for the ask
                if !node.IsSchedulable() {
@@ -1417,7 +1421,13 @@ func (sa *Application) tryNodes(ask *Allocation, 
iterator NodeIterator) *Allocat
                        return true
                }
                tryNodeStart := time.Now()
-               result := sa.tryNode(node, ask)
+               result, err := sa.tryNode(node, ask)
+               if err != nil {
+                       if predicateErrors == nil {
+                               predicateErrors = make(map[string]int)
+                       }
+                       predicateErrors[err.Error()]++
+               }
                // allocation worked so return
                if result != nil {
                        
metrics.GetSchedulerMetrics().ObserveTryNodeLatency(tryNodeStart)
@@ -1472,6 +1482,10 @@ func (sa *Application) tryNodes(ask *Allocation, 
iterator NodeIterator) *Allocat
                return allocResult
        }
 
+       if predicateErrors != nil {
+               ask.SendPredicatesFailedEvent(predicateErrors)
+       }
+
        // we have not allocated yet, check if we should reserve
        // NOTE: the node should not be reserved as the iterator filters them 
but we do not lock the nodes
        if nodeToReserve != nil && !nodeToReserve.IsReserved() {
@@ -1481,7 +1495,7 @@ func (sa *Application) tryNodes(ask *Allocation, iterator 
NodeIterator) *Allocat
                        zap.String("allocationKey", allocKey),
                        zap.Int("reservations", len(reservedAsks)))
                // skip the node if conditions can not be satisfied
-               if !nodeToReserve.preReserveConditions(ask) {
+               if nodeToReserve.preReserveConditions(ask) != nil {
                        return nil
                }
                // return reservation allocation and mark it as a reservation
@@ -1492,16 +1506,16 @@ func (sa *Application) tryNodes(ask *Allocation, 
iterator NodeIterator) *Allocat
 }
 
 // Try allocating on one specific node
-func (sa *Application) tryNode(node *Node, ask *Allocation) *AllocationResult {
+func (sa *Application) tryNode(node *Node, ask *Allocation) 
(*AllocationResult, error) {
        toAllocate := ask.GetAllocatedResource()
        // create the key for the reservation
        if !node.preAllocateCheck(toAllocate, reservationKey(nil, sa, ask)) {
                // skip schedule onto node
-               return nil
+               return nil, nil
        }
        // skip the node if conditions can not be satisfied
-       if !node.preAllocateConditions(ask) {
-               return nil
+       if err := node.preAllocateConditions(ask); err != nil {
+               return nil, err
        }
 
        // everything OK really allocate
@@ -1511,7 +1525,7 @@ func (sa *Application) tryNode(node *Node, ask 
*Allocation) *AllocationResult {
                                zap.Error(err))
                        // revert the node update
                        node.RemoveAllocation(ask.GetAllocationKey())
-                       return nil
+                       return nil, nil
                }
                // mark this ask as allocated
                _, err := sa.allocateAsk(ask)
@@ -1522,9 +1536,9 @@ func (sa *Application) tryNode(node *Node, ask 
*Allocation) *AllocationResult {
                // all is OK, last update for the app
                result := newAllocatedAllocationResult(node.NodeID, ask)
                sa.addAllocationInternal(result.ResultType, ask)
-               return result
+               return result, nil
        }
-       return nil
+       return nil, nil
 }
 
 func (sa *Application) GetQueuePath() string {
diff --git a/pkg/scheduler/objects/application_test.go 
b/pkg/scheduler/objects/application_test.go
index 3f1d6bb5..a58db862 100644
--- a/pkg/scheduler/objects/application_test.go
+++ b/pkg/scheduler/objects/application_test.go
@@ -33,6 +33,8 @@ import (
        "github.com/apache/yunikorn-core/pkg/events"
        "github.com/apache/yunikorn-core/pkg/events/mock"
        "github.com/apache/yunikorn-core/pkg/handler"
+       mockCommon "github.com/apache/yunikorn-core/pkg/mock"
+       "github.com/apache/yunikorn-core/pkg/plugins"
        "github.com/apache/yunikorn-core/pkg/rmproxy"
        "github.com/apache/yunikorn-core/pkg/rmproxy/rmevent"
        schedEvt "github.com/apache/yunikorn-core/pkg/scheduler/objects/events"
@@ -2668,6 +2670,53 @@ func TestUpdateRunnableStatus(t *testing.T) {
        assert.Equal(t, si.EventRecord_APP_CANNOTRUN_QUOTA, 
eventSystem.Events[1].EventChangeDetail)
 }
 
+func TestPredicateFailedEvents(t *testing.T) {
+       setupUGM()
+
+       res, err := resources.NewResourceFromConf(map[string]string{"first": 
"1"})
+       assert.NilError(t, err)
+       headroom, err := 
resources.NewResourceFromConf(map[string]string{"first": "40"})
+       assert.NilError(t, err)
+       ask := newAllocationAsk("alloc-0", "app-1", res)
+       app := newApplication(appID1, "default", "root")
+       eventSystem := mock.NewEventSystem()
+       ask.askEvents = schedEvt.NewAskEvents(eventSystem)
+       app.disableStateChangeEvents()
+       app.resetAppEvents()
+       queue, err := createRootQueue(nil)
+       assert.NilError(t, err, "queue create failed")
+       app.queue = queue
+       sr := sortedRequests{}
+       sr.insert(ask)
+       app.sortedRequests = sr
+       attempts := 0
+
+       mockPlugin := mockCommon.NewPredicatePlugin(true, nil)
+       plugins.RegisterSchedulerPlugin(mockPlugin)
+       defer plugins.UnregisterSchedulerPlugins()
+
+       app.tryAllocate(headroom, false, time.Second, &attempts, func() 
NodeIterator {
+               return &testIterator{}
+       }, nilNodeIterator, nilGetNode)
+       assert.Equal(t, 1, len(eventSystem.Events))
+       event := eventSystem.Events[0]
+       assert.Equal(t, si.EventRecord_REQUEST, event.Type)
+       assert.Equal(t, si.EventRecord_NONE, event.EventChangeType)
+       assert.Equal(t, si.EventRecord_DETAILS_NONE, event.EventChangeDetail)
+       assert.Equal(t, "app-1", event.ReferenceID)
+       assert.Equal(t, "alloc-0", event.ObjectID)
+       assert.Equal(t, "Unschedulable request 'alloc-0': fake predicate plugin 
failed (2x); ", event.Message)
+}
+
+type testIterator struct{}
+
+func (testIterator) ForEachNode(fn func(*Node) bool) {
+       node1 := newNode(nodeID1, map[string]resources.Quantity{"first": 20})
+       node2 := newNode(nodeID2, map[string]resources.Quantity{"first": 20})
+       fn(node1)
+       fn(node2)
+}
+
 func TestGetMaxResourceFromTag(t *testing.T) {
        app := newApplication(appID0, "default", "root.unknown")
        testGetResourceFromTag(t, siCommon.AppTagNamespaceResourceQuota, 
app.tags, app.GetMaxResource)
diff --git a/pkg/scheduler/objects/events/ask_events.go 
b/pkg/scheduler/objects/events/ask_events.go
index 7a7e4095..64265c6d 100644
--- a/pkg/scheduler/objects/events/ask_events.go
+++ b/pkg/scheduler/objects/events/ask_events.go
@@ -20,6 +20,8 @@ package events
 
 import (
        "fmt"
+       "sort"
+       "strconv"
        "time"
 
        "golang.org/x/time/rate"
@@ -70,11 +72,24 @@ func (ae *AskEvents) SendRequestFitsInUserQuota(allocKey, 
appID string, allocate
        ae.eventSystem.AddEvent(event)
 }
 
-func (ae *AskEvents) SendPredicateFailed(allocKey, appID, predicateMsg string, 
allocatedResource *resources.Resource) {
+func (ae *AskEvents) SendPredicatesFailed(allocKey, appID string, 
predicateErrors map[string]int, allocatedResource *resources.Resource) {
        if !ae.eventSystem.IsEventTrackingEnabled() || !ae.limiter.Allow() {
                return
        }
-       message := fmt.Sprintf("Predicate failed for request '%s' with message: 
'%s'", allocKey, predicateMsg)
+
+       messages := make([]string, 0, len(predicateErrors))
+       for k := range predicateErrors {
+               messages = append(messages, k)
+       }
+       sort.Strings(messages) // make sure we always have the same string 
regardless of iteration order
+
+       var failures string
+       for _, m := range messages {
+               times := strconv.Itoa(predicateErrors[m])
+               failures = failures + m + " (" + times + "x); " // example: 
"node(s) had taints that the pod didn't tolerate (5x);"
+       }
+
+       message := fmt.Sprintf("Unschedulable request '%s': %s", allocKey, 
failures)
        event := events.CreateRequestEventRecord(allocKey, appID, message, 
allocatedResource)
        ae.eventSystem.AddEvent(event)
 }
diff --git a/pkg/scheduler/objects/events/ask_events_test.go 
b/pkg/scheduler/objects/events/ask_events_test.go
index a3a661ef..add2fc53 100644
--- a/pkg/scheduler/objects/events/ask_events_test.go
+++ b/pkg/scheduler/objects/events/ask_events_test.go
@@ -19,7 +19,6 @@
 package events
 
 import (
-       "strconv"
        "testing"
        "time"
 
@@ -117,24 +116,29 @@ func TestPredicateFailedEvents(t *testing.T) {
        resource := 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 1})
        eventSystem := mock.NewEventSystemDisabled()
        events := NewAskEvents(eventSystem)
-       events.SendPredicateFailed("alloc-0", "app-0", "failed", resource)
+       events.SendPredicatesFailed("alloc-0", "app-0", map[string]int{}, 
resource)
        assert.Equal(t, 0, len(eventSystem.Events))
 
        eventSystem = mock.NewEventSystem()
        events = newAskEventsWithRate(eventSystem, 50*time.Millisecond, 1)
+       errors := map[string]int{
+               "error#0": 2,
+               "error#1": 123,
+               "error#2": 44,
+       }
        // only the first event is expected to be emitted due to rate limiting
        for i := 0; i < 200; i++ {
-               events.SendPredicateFailed("alloc-0", "app-0", 
"failure-"+strconv.FormatUint(uint64(i), 10), resource)
+               events.SendPredicatesFailed("alloc-0", "app-0", errors, 
resource)
        }
        assert.Equal(t, 1, len(eventSystem.Events))
        event := eventSystem.Events[0]
-       assert.Equal(t, "Predicate failed for request 'alloc-0' with message: 
'failure-0'", event.Message)
+       assert.Equal(t, "Unschedulable request 'alloc-0': error#0 (2x); error#1 
(123x); error#2 (44x); ", event.Message)
 
        eventSystem.Reset()
        // wait a bit, a new event is expected
        time.Sleep(100 * time.Millisecond)
-       events.SendPredicateFailed("alloc-0", "app-0", "failed", resource)
+       events.SendPredicatesFailed("alloc-0", "app-0", errors, resource)
        assert.Equal(t, 1, len(eventSystem.Events))
        event = eventSystem.Events[0]
-       assert.Equal(t, "Predicate failed for request 'alloc-0' with message: 
'failed'", event.Message)
+       assert.Equal(t, "Unschedulable request 'alloc-0': error#0 (2x); error#1 
(123x); error#2 (44x); ", event.Message)
 }
diff --git a/pkg/scheduler/objects/node.go b/pkg/scheduler/objects/node.go
index 008d358a..196e6995 100644
--- a/pkg/scheduler/objects/node.go
+++ b/pkg/scheduler/objects/node.go
@@ -367,12 +367,12 @@ func (sn *Node) CanAllocate(res *resources.Resource) bool 
{
 }
 
 // Checking pre-conditions in the shim for an allocation.
-func (sn *Node) preAllocateConditions(ask *Allocation) bool {
+func (sn *Node) preAllocateConditions(ask *Allocation) error {
        return sn.preConditions(ask, true)
 }
 
 // Checking pre-conditions in the shim for a reservation.
-func (sn *Node) preReserveConditions(ask *Allocation) bool {
+func (sn *Node) preReserveConditions(ask *Allocation) error {
        return sn.preConditions(ask, false)
 }
 
@@ -382,7 +382,7 @@ func (sn *Node) preReserveConditions(ask *Allocation) bool {
 // The caller must thus not rely on all plugins being executed.
 // This is a lock free call as it does not change the node and multiple 
predicate checks could be
 // run at the same time.
-func (sn *Node) preConditions(ask *Allocation, allocate bool) bool {
+func (sn *Node) preConditions(ask *Allocation, allocate bool) error {
        // Check the predicates plugin (k8shim)
        allocationKey := ask.GetAllocationKey()
        if plugin := plugins.GetResourceManagerCallbackPlugin(); plugin != nil {
@@ -400,12 +400,11 @@ func (sn *Node) preConditions(ask *Allocation, allocate 
bool) bool {
                        // running predicates failed
                        msg := err.Error()
                        ask.LogAllocationFailure(msg, allocate)
-                       ask.SendPredicateFailedEvent(msg)
-                       return false
+                       return err
                }
        }
        // all predicate plugins passed
-       return true
+       return nil
 }
 
 // preAllocateCheck checks if the node should be considered as a possible node 
to allocate on.
diff --git a/pkg/scheduler/objects/node_test.go 
b/pkg/scheduler/objects/node_test.go
index 274fb8a0..b09a5d67 100644
--- a/pkg/scheduler/objects/node_test.go
+++ b/pkg/scheduler/objects/node_test.go
@@ -104,7 +104,7 @@ func TestCheckConditions(t *testing.T) {
        // Check if we can allocate on scheduling node (no plugins)
        res := 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 1})
        ask := newAllocationAsk("test", "app001", res)
-       if !node.preAllocateConditions(ask) {
+       if node.preAllocateConditions(ask) != nil {
                t.Error("node with scheduling set to true no plugins should 
allow allocation")
        }
 }
@@ -770,10 +770,7 @@ func TestNode_FitInNode(t *testing.T) {
 }
 
 func TestPreconditions(t *testing.T) {
-       current := plugins.GetResourceManagerCallbackPlugin()
-       defer func() {
-               plugins.RegisterSchedulerPlugin(current)
-       }()
+       defer plugins.UnregisterSchedulerPlugins()
 
        plugins.RegisterSchedulerPlugin(mock.NewPredicatePlugin(true, 
map[string]int{}))
        total := 
resources.NewResourceFromMap(map[string]resources.Quantity{"cpu": 100, 
"memory": 100})
@@ -783,21 +780,17 @@ func TestPreconditions(t *testing.T) {
        })
        res := 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 1})
        ask := newAllocationAsk("test", "app001", res)
-       eventSystem := evtMock.NewEventSystem()
-       ask.askEvents = schedEvt.NewAskEvents(eventSystem)
        node := NewNode(proto)
 
        // failure
-       node.preConditions(ask, true)
-       assert.Equal(t, 1, len(eventSystem.Events))
-       assert.Equal(t, "Predicate failed for request 'test' with message: 
'fake predicate plugin failed'", eventSystem.Events[0].Message)
+       err := node.preConditions(ask, true)
+       assert.ErrorContains(t, err, "fake predicate plugin failed")
        assert.Equal(t, 1, len(ask.allocLog))
        assert.Equal(t, "fake predicate plugin failed", ask.allocLog["fake 
predicate plugin failed"].Message)
 
        // pass
-       eventSystem.Reset()
        plugins.RegisterSchedulerPlugin(mock.NewPredicatePlugin(false, 
map[string]int{}))
-       node.preConditions(ask, true)
-       assert.Equal(t, 0, len(eventSystem.Events))
+       err = node.preConditions(ask, true)
+       assert.NilError(t, err)
        assert.Equal(t, 1, len(ask.allocLog))
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to