This is an automated email from the ASF dual-hosted git repository.

pbacsko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git


The following commit(s) were added to refs/heads/master by this push:
     new dc4f6efd [YUNIKORN-2370] Proper event handling for failed headroom 
checks (#784)
dc4f6efd is described below

commit dc4f6efddce376e103314325a7e9236924c83b4e
Author: Peter Bacsko <[email protected]>
AuthorDate: Wed Feb 7 11:46:28 2024 +0100

    [YUNIKORN-2370] Proper event handling for failed headroom checks (#784)
    
    Closes: #784
    
    Signed-off-by: Peter Bacsko <[email protected]>
---
 pkg/scheduler/objects/allocation_ask.go          |  47 +++++++
 pkg/scheduler/objects/allocation_ask_test.go     |  22 ++++
 pkg/scheduler/objects/application.go             |   6 +-
 pkg/scheduler/objects/application_events.go      |  10 --
 pkg/scheduler/objects/application_events_test.go |  44 +------
 pkg/scheduler/objects/application_test.go        | 128 ++++++++++++++----
 pkg/scheduler/objects/ask_events.go              |  93 +++++++++++++
 pkg/scheduler/objects/ask_events_test.go         | 161 +++++++++++++++++++++++
 pkg/scheduler/objects/node.go                    |   4 +-
 pkg/scheduler/objects/node_test.go               |  35 +++++
 10 files changed, 471 insertions(+), 79 deletions(-)

diff --git a/pkg/scheduler/objects/allocation_ask.go 
b/pkg/scheduler/objects/allocation_ask.go
index 7a42d04c..181e7f57 100644
--- a/pkg/scheduler/objects/allocation_ask.go
+++ b/pkg/scheduler/objects/allocation_ask.go
@@ -27,6 +27,7 @@ import (
 
        "github.com/apache/yunikorn-core/pkg/common"
        "github.com/apache/yunikorn-core/pkg/common/resources"
+       "github.com/apache/yunikorn-core/pkg/events"
        "github.com/apache/yunikorn-core/pkg/log"
        "github.com/apache/yunikorn-scheduler-interface/lib/go/si"
 )
@@ -59,6 +60,10 @@ type AllocationAsk struct {
        scaleUpTriggered    bool              // whether this ask has triggered 
autoscaling or not
        resKeyPerNode       map[string]string // reservation key for a given 
node
 
+       askEvents            *askEvents
+       userQuotaCheckFailed bool
+       headroomCheckFailed  bool
+
        sync.RWMutex
 }
 
@@ -77,6 +82,7 @@ func NewAllocationAsk(allocationKey string, applicationID 
string, allocatedResou
                resKeyPerNode:     make(map[string]string),
        }
        aa.resKeyWithoutNode = reservationKeyWithoutNode(applicationID, 
allocationKey)
+       aa.askEvents = newAskEvents(aa, events.GetEventSystem())
        return aa
 }
 
@@ -110,6 +116,7 @@ func NewAllocationAskFromSI(ask *si.AllocationAsk) 
*AllocationAsk {
                return nil
        }
        saa.resKeyWithoutNode = reservationKeyWithoutNode(ask.ApplicationID, 
ask.AllocationKey)
+       saa.askEvents = newAskEvents(saa, events.GetEventSystem())
        return saa
 }
 
@@ -261,6 +268,10 @@ func (aa *AllocationAsk) LogAllocationFailure(message 
string, allocate bool) {
        entry.Count++
 }
 
+func (aa *AllocationAsk) SendPredicateFailedEvent(message string) {
+       aa.askEvents.sendPredicateFailed(message)
+}
+
 // GetAllocationLog returns a list of log entries corresponding to allocation 
preconditions not being met
 func (aa *AllocationAsk) GetAllocationLog() []*AllocationLogEntry {
        aa.RLock()
@@ -343,3 +354,39 @@ func (aa *AllocationAsk) getReservationKeyForNode(node 
string) string {
        defer aa.RUnlock()
        return aa.resKeyPerNode[node]
 }
+
+func (aa *AllocationAsk) setHeadroomCheckFailed(headroom *resources.Resource, 
queue string) {
+       aa.Lock()
+       defer aa.Unlock()
+       if !aa.headroomCheckFailed {
+               aa.headroomCheckFailed = true
+               aa.askEvents.sendRequestExceedsQueueHeadroom(headroom, queue)
+       }
+}
+
+func (aa *AllocationAsk) setHeadroomCheckPassed(queue string) {
+       aa.Lock()
+       defer aa.Unlock()
+       if aa.headroomCheckFailed {
+               aa.headroomCheckFailed = false
+               aa.askEvents.sendRequestFitsInQueue(queue)
+       }
+}
+
+func (aa *AllocationAsk) setUserQuotaCheckFailed(available 
*resources.Resource) {
+       aa.Lock()
+       defer aa.Unlock()
+       if !aa.userQuotaCheckFailed {
+               aa.userQuotaCheckFailed = true
+               aa.askEvents.sendRequestExceedsUserQuota(available)
+       }
+}
+
+func (aa *AllocationAsk) setUserQuotaCheckPassed() {
+       aa.Lock()
+       defer aa.Unlock()
+       if aa.userQuotaCheckFailed {
+               aa.userQuotaCheckFailed = false
+               aa.askEvents.sendRequestFitsInUserQuota()
+       }
+}
diff --git a/pkg/scheduler/objects/allocation_ask_test.go 
b/pkg/scheduler/objects/allocation_ask_test.go
index df49e87d..e10e925e 100644
--- a/pkg/scheduler/objects/allocation_ask_test.go
+++ b/pkg/scheduler/objects/allocation_ask_test.go
@@ -229,6 +229,28 @@ func TestAllocationLog(t *testing.T) {
        assert.Equal(t, 1, int(log[1].Count), "wrong count for event 2")
 }
 
+func TestSendPredicateFailed(t *testing.T) {
+       res := 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10})
+       siAsk := &si.AllocationAsk{
+               AllocationKey:  "ask-1",
+               ApplicationID:  "app-1",
+               MaxAllocations: 1,
+               ResourceAsk:    res.ToProto(),
+       }
+       ask := NewAllocationAskFromSI(siAsk)
+       eventSystem := newEventSystemMockDisabled()
+       ask.askEvents = newAskEvents(ask, eventSystem)
+       ask.SendPredicateFailedEvent("failed")
+       assert.Equal(t, 0, len(eventSystem.events))
+
+       eventSystem = newEventSystemMock()
+       ask.askEvents = newAskEvents(ask, eventSystem)
+       ask.SendPredicateFailedEvent("failure")
+       assert.Equal(t, 1, len(eventSystem.events))
+       event := eventSystem.events[0]
+       assert.Equal(t, "Predicate failed for request 'ask-1' with message: 
'failure'", event.Message)
+}
+
 func sortedLog(ask *AllocationAsk) []*AllocationLogEntry {
        log := ask.GetAllocationLog()
        sort.SliceStable(log, func(i int, j int) bool {
diff --git a/pkg/scheduler/objects/application.go 
b/pkg/scheduler/objects/application.go
index 17fb264a..d26634da 100644
--- a/pkg/scheduler/objects/application.go
+++ b/pkg/scheduler/objects/application.go
@@ -955,9 +955,10 @@ func (sa *Application) tryAllocate(headRoom 
*resources.Resource, allowPreemption
                // as the preempted allocation must be for the same user in a 
different queue in the hierarchy...
                if !userHeadroom.FitInMaxUndef(request.GetAllocatedResource()) {
                        request.LogAllocationFailure(NotEnoughUserQuota, true) 
// error message MUST be constant!
+                       request.setUserQuotaCheckFailed(userHeadroom)
                        continue
                }
-
+               request.setUserQuotaCheckPassed()
                request.SetSchedulingAttempted(true)
 
                // resource must fit in headroom otherwise skip the request 
(unless preemption could help)
@@ -973,10 +974,11 @@ func (sa *Application) tryAllocate(headRoom 
*resources.Resource, allowPreemption
                                        }
                                }
                        }
-                       sa.appEvents.sendAppDoesNotFitEvent(request, headRoom)
                        request.LogAllocationFailure(NotEnoughQueueQuota, true) 
// error message MUST be constant!
+                       request.setHeadroomCheckFailed(headRoom, sa.queuePath)
                        continue
                }
+               request.setHeadroomCheckPassed(sa.queuePath)
 
                requiredNode := request.GetRequiredNode()
                // does request have any constraint to run on specific node?
diff --git a/pkg/scheduler/objects/application_events.go 
b/pkg/scheduler/objects/application_events.go
index 6f209826..87b0a7b3 100644
--- a/pkg/scheduler/objects/application_events.go
+++ b/pkg/scheduler/objects/application_events.go
@@ -25,7 +25,6 @@ import (
        "golang.org/x/time/rate"
 
        "github.com/apache/yunikorn-core/pkg/common"
-       "github.com/apache/yunikorn-core/pkg/common/resources"
        "github.com/apache/yunikorn-core/pkg/events"
        "github.com/apache/yunikorn-scheduler-interface/lib/go/si"
 )
@@ -36,15 +35,6 @@ type applicationEvents struct {
        limiter     *rate.Limiter
 }
 
-func (evt *applicationEvents) sendAppDoesNotFitEvent(request *AllocationAsk, 
headroom *resources.Resource) {
-       if !evt.eventSystem.IsEventTrackingEnabled() || !evt.limiter.Allow() {
-               return
-       }
-       message := fmt.Sprintf("Application %s does not fit into %s queue 
(request resoure %s, headroom %s)", request.GetApplicationID(), 
evt.app.queuePath, request.GetAllocatedResource(), headroom)
-       event := events.CreateRequestEventRecord(request.GetAllocationKey(), 
request.GetApplicationID(), message, request.GetAllocatedResource())
-       evt.eventSystem.AddEvent(event)
-}
-
 func (evt *applicationEvents) sendPlaceholderLargerEvent(ph *Allocation, 
request *AllocationAsk) {
        if !evt.eventSystem.IsEventTrackingEnabled() {
                return
diff --git a/pkg/scheduler/objects/application_events_test.go 
b/pkg/scheduler/objects/application_events_test.go
index 827ccd02..c4d6d0e4 100644
--- a/pkg/scheduler/objects/application_events_test.go
+++ b/pkg/scheduler/objects/application_events_test.go
@@ -19,13 +19,10 @@
 package objects
 
 import (
-       "testing"
-       "time"
-
        "gotest.tools/v3/assert"
+       "testing"
 
        "github.com/apache/yunikorn-core/pkg/common"
-       "github.com/apache/yunikorn-core/pkg/common/resources"
        "github.com/apache/yunikorn-scheduler-interface/lib/go/si"
 )
 
@@ -50,45 +47,6 @@ func isStateChangeEvent(t *testing.T, app *Application, 
changeDetail si.EventRec
        assert.Equal(t, changeDetail, record.EventChangeDetail, "incorrect 
change detail")
 }
 
-func TestSendAppDoesNotFitEvent(t *testing.T) {
-       app := &Application{
-               queuePath: "root.test",
-       }
-       mock := newEventSystemMockDisabled()
-       appEvents := newApplicationEvents(app, mock)
-       appEvents.sendAppDoesNotFitEvent(&AllocationAsk{}, 
&resources.Resource{})
-       assert.Equal(t, 0, len(mock.events), "unexpected event")
-
-       mock = newEventSystemMock()
-       appEvents = newApplicationEvents(app, mock)
-       appEvents.sendAppDoesNotFitEvent(&AllocationAsk{
-               applicationID: appID0,
-               allocationKey: aKey,
-       }, &resources.Resource{})
-       assert.Equal(t, 1, len(mock.events), "event was not generated")
-}
-
-func TestSendAppDoesNotFitEventWithRateLimiter(t *testing.T) {
-       app := &Application{
-               queuePath: "root.test",
-       }
-       mock := newEventSystemMock()
-       appEvents := newApplicationEvents(app, mock)
-       startTime := time.Now()
-       for {
-               elapsed := time.Since(startTime)
-               if elapsed > 500*time.Millisecond {
-                       break
-               }
-               appEvents.sendAppDoesNotFitEvent(&AllocationAsk{
-                       applicationID: appID0,
-                       allocationKey: aKey,
-               }, &resources.Resource{})
-               time.Sleep(10 * time.Millisecond)
-       }
-       assert.Equal(t, 1, len(mock.events), "event was not generated")
-}
-
 func TestSendPlaceholderLargerEvent(t *testing.T) {
        app := &Application{
                queuePath: "root.test",
diff --git a/pkg/scheduler/objects/application_test.go 
b/pkg/scheduler/objects/application_test.go
index b8286ff2..742f3ac8 100644
--- a/pkg/scheduler/objects/application_test.go
+++ b/pkg/scheduler/objects/application_test.go
@@ -2258,19 +2258,15 @@ func TestPlaceholderLargerEvent(t *testing.T) {
        assert.Equal(t, "app-1", records[3].ReferenceID)
 }
 
-func TestAppDoesNotFitEvent(t *testing.T) {
-       resMap := map[string]string{"memory": "100", "vcores": "10"}
-       res, err := resources.NewResourceFromConf(resMap)
-       assert.NilError(t, err, "failed to create resource with error")
-       headroomMap := map[string]string{"memory": "0", "vcores": "0"}
-       headroom, err := resources.NewResourceFromConf(headroomMap)
-       assert.NilError(t, err, "failed to create resource with error")
+func TestRequestDoesNotFitQueueEvents(t *testing.T) {
+       res, err := resources.NewResourceFromConf(map[string]string{"memory": 
"100", "vcores": "10"})
+       assert.NilError(t, err)
+       headroom, err := 
resources.NewResourceFromConf(map[string]string{"memory": "0", "vcores": "0"})
+       assert.NilError(t, err)
        ask := newAllocationAsk("alloc-0", "app-1", res)
        app := newApplication(appID1, "default", "root.default")
-       // Create event system after new application to avoid new application 
event.
-       events.Init()
-       eventSystem := events.GetEventSystem().(*events.EventSystemImpl) 
//nolint:errcheck
-       eventSystem.StartServiceWithPublisher(false)
+       eventSystem := newEventSystemMock()
+       ask.askEvents = newAskEvents(ask, eventSystem)
        app.disableStateChangeEvents()
        app.resetAppEvents()
        queue, err := createRootQueue(nil)
@@ -2281,20 +2277,106 @@ func TestAppDoesNotFitEvent(t *testing.T) {
        app.sortedRequests = sr
        attempts := 0
 
+       // try to allocate
+       app.tryAllocate(headroom, true, time.Second, &attempts, 
nilNodeIterator, nilNodeIterator, nilGetNode)
+       assert.Equal(t, 1, len(eventSystem.events))
+       event := eventSystem.events[0]
+       assert.Equal(t, si.EventRecord_REQUEST, event.Type)
+       assert.Equal(t, si.EventRecord_NONE, event.EventChangeType)
+       assert.Equal(t, si.EventRecord_DETAILS_NONE, event.EventChangeDetail)
+       assert.Equal(t, "app-1", event.ReferenceID)
+       assert.Equal(t, "alloc-0", event.ObjectID)
+       assert.Equal(t, "Request 'alloc-0' does not fit in queue 'root.default' 
(requested map[memory:100 vcores:10], available map[memory:0 vcores:0])", 
event.Message)
+
+       // second attempt - no new event
        app.tryAllocate(headroom, true, time.Second, &attempts, 
nilNodeIterator, nilNodeIterator, nilGetNode)
+       assert.Equal(t, 1, len(eventSystem.events))
 
-       noEvents := 0
-       err = common.WaitFor(10*time.Millisecond, time.Second, func() bool {
-               noEvents = eventSystem.Store.CountStoredEvents()
-               return noEvents == 2
-       })
-       assert.NilError(t, err, "expected 2 event, got %d", noEvents)
-       records := eventSystem.Store.CollectEvents()
-       assert.Equal(t, si.EventRecord_REQUEST, records[1].Type)
-       assert.Equal(t, si.EventRecord_NONE, records[1].EventChangeType)
-       assert.Equal(t, si.EventRecord_DETAILS_NONE, 
records[1].EventChangeDetail)
-       assert.Equal(t, "app-1", records[1].ReferenceID)
-       assert.Equal(t, "alloc-0", records[1].ObjectID)
+       // third attempt with enough headroom - new event
+       eventSystem.Reset()
+       headroom, err = 
resources.NewResourceFromConf(map[string]string{"memory": "1000", "vcores": 
"1000"})
+       assert.NilError(t, err)
+       app.tryAllocate(headroom, true, time.Second, &attempts, 
nilNodeIterator, nilNodeIterator, nilGetNode)
+       assert.Equal(t, 1, len(eventSystem.events))
+       event = eventSystem.events[0]
+       assert.Equal(t, si.EventRecord_REQUEST, event.Type)
+       assert.Equal(t, si.EventRecord_NONE, event.EventChangeType)
+       assert.Equal(t, si.EventRecord_DETAILS_NONE, event.EventChangeDetail)
+       assert.Equal(t, "app-1", event.ReferenceID)
+       assert.Equal(t, "alloc-0", event.ObjectID)
+       assert.Equal(t, "Request 'alloc-0' has become schedulable in queue 
'root.default'", event.Message)
+}
+
+func TestRequestDoesNotFitUserQuotaQueueEvents(t *testing.T) {
+       setupUGM()
+       // create config with resource limits for "testuser"
+       conf := configs.QueueConfig{
+               Name:      "root",
+               Parent:    true,
+               SubmitACL: "*",
+               Limits: []configs.Limit{
+                       {
+                               Limit: "leaf queue limit",
+                               Users: []string{
+                                       "testuser",
+                               },
+                               MaxResources: map[string]string{
+                                       "memory": "1",
+                                       "vcores": "1",
+                               },
+                       },
+               },
+       }
+       err := ugm.GetUserManager().UpdateConfig(conf, "root")
+       assert.NilError(t, err)
+
+       res, err := resources.NewResourceFromConf(map[string]string{"memory": 
"100", "vcores": "10"})
+       assert.NilError(t, err)
+       headroom, err := 
resources.NewResourceFromConf(map[string]string{"memory": "1000", "vcores": 
"1000"})
+       assert.NilError(t, err)
+       ask := newAllocationAsk("alloc-0", "app-1", res)
+       app := newApplication(appID1, "default", "root")
+       eventSystem := newEventSystemMock()
+       ask.askEvents = newAskEvents(ask, eventSystem)
+       app.disableStateChangeEvents()
+       app.resetAppEvents()
+       queue, err := createRootQueue(nil)
+       assert.NilError(t, err, "queue create failed")
+       app.queue = queue
+       sr := sortedRequests{}
+       sr.insert(ask)
+       app.sortedRequests = sr
+       attempts := 0
+
+       // try to allocate
+       app.tryAllocate(headroom, true, time.Second, &attempts, 
nilNodeIterator, nilNodeIterator, nilGetNode)
+       assert.Equal(t, 1, len(eventSystem.events))
+       event := eventSystem.events[0]
+       assert.Equal(t, si.EventRecord_REQUEST, event.Type)
+       assert.Equal(t, si.EventRecord_NONE, event.EventChangeType)
+       assert.Equal(t, si.EventRecord_DETAILS_NONE, event.EventChangeDetail)
+       assert.Equal(t, "app-1", event.ReferenceID)
+       assert.Equal(t, "alloc-0", event.ObjectID)
+       assert.Equal(t, "Request 'alloc-0' exceeds the available user quota 
(requested map[memory:100 vcores:10], available map[memory:1 vcores:1])", 
event.Message)
+
+       // second attempt - no new event
+       app.tryAllocate(headroom, true, time.Second, &attempts, 
nilNodeIterator, nilNodeIterator, nilGetNode)
+       assert.Equal(t, 1, len(eventSystem.events))
+
+       // third attempt with enough headroom - new event
+       eventSystem.Reset()
+       conf.Limits[0].MaxResources = nil
+       err = ugm.GetUserManager().UpdateConfig(conf, "root")
+       assert.NilError(t, err)
+       app.tryAllocate(headroom, true, time.Second, &attempts, 
nilNodeIterator, nilNodeIterator, nilGetNode)
+       assert.Equal(t, 1, len(eventSystem.events))
+       event = eventSystem.events[0]
+       assert.Equal(t, si.EventRecord_REQUEST, event.Type)
+       assert.Equal(t, si.EventRecord_NONE, event.EventChangeType)
+       assert.Equal(t, si.EventRecord_DETAILS_NONE, event.EventChangeDetail)
+       assert.Equal(t, "app-1", event.ReferenceID)
+       assert.Equal(t, "alloc-0", event.ObjectID)
+       assert.Equal(t, "Request 'alloc-0' fits in the available user quota", 
event.Message)
 }
 
 func TestAllocationFailures(t *testing.T) {
diff --git a/pkg/scheduler/objects/ask_events.go 
b/pkg/scheduler/objects/ask_events.go
new file mode 100644
index 00000000..3994ef33
--- /dev/null
+++ b/pkg/scheduler/objects/ask_events.go
@@ -0,0 +1,93 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package objects
+
+import (
+       "fmt"
+       "time"
+
+       "golang.org/x/time/rate"
+
+       "github.com/apache/yunikorn-core/pkg/common/resources"
+       "github.com/apache/yunikorn-core/pkg/events"
+)
+
+// Ask-specific events. These events are of REQUEST type, so they are 
eventually sent to the respective pods in K8s.
+type askEvents struct {
+       eventSystem events.EventSystem
+       ask         *AllocationAsk
+       limiter     *rate.Limiter
+}
+
+func (ae *askEvents) sendRequestExceedsQueueHeadroom(headroom 
*resources.Resource, queuePath string) {
+       if !ae.eventSystem.IsEventTrackingEnabled() {
+               return
+       }
+       message := fmt.Sprintf("Request '%s' does not fit in queue '%s' 
(requested %s, available %s)", ae.ask.allocationKey, queuePath, 
ae.ask.GetAllocatedResource(), headroom)
+       event := events.CreateRequestEventRecord(ae.ask.allocationKey, 
ae.ask.applicationID, message, ae.ask.GetAllocatedResource())
+       ae.eventSystem.AddEvent(event)
+}
+
+func (ae *askEvents) sendRequestFitsInQueue(queuePath string) {
+       if !ae.eventSystem.IsEventTrackingEnabled() {
+               return
+       }
+       message := fmt.Sprintf("Request '%s' has become schedulable in queue 
'%s'", ae.ask.allocationKey, queuePath)
+       event := events.CreateRequestEventRecord(ae.ask.allocationKey, 
ae.ask.applicationID, message, ae.ask.GetAllocatedResource())
+       ae.eventSystem.AddEvent(event)
+}
+
+func (ae *askEvents) sendRequestExceedsUserQuota(headroom *resources.Resource) 
{
+       if !ae.eventSystem.IsEventTrackingEnabled() {
+               return
+       }
+       message := fmt.Sprintf("Request '%s' exceeds the available user quota 
(requested %s, available %s)", ae.ask.allocationKey, 
ae.ask.GetAllocatedResource(), headroom)
+       event := events.CreateRequestEventRecord(ae.ask.allocationKey, 
ae.ask.applicationID, message, ae.ask.GetAllocatedResource())
+       ae.eventSystem.AddEvent(event)
+}
+
+func (ae *askEvents) sendRequestFitsInUserQuota() {
+       if !ae.eventSystem.IsEventTrackingEnabled() {
+               return
+       }
+       message := fmt.Sprintf("Request '%s' fits in the available user quota", 
ae.ask.allocationKey)
+       event := events.CreateRequestEventRecord(ae.ask.allocationKey, 
ae.ask.applicationID, message, ae.ask.GetAllocatedResource())
+       ae.eventSystem.AddEvent(event)
+}
+
+func (ae *askEvents) sendPredicateFailed(predicateMsg string) {
+       if !ae.eventSystem.IsEventTrackingEnabled() || !ae.limiter.Allow() {
+               return
+       }
+       message := fmt.Sprintf("Predicate failed for request '%s' with message: 
'%s'", ae.ask.allocationKey, predicateMsg)
+       event := events.CreateRequestEventRecord(ae.ask.allocationKey, 
ae.ask.applicationID, message, ae.ask.GetAllocatedResource())
+       ae.eventSystem.AddEvent(event)
+}
+
+func newAskEvents(ask *AllocationAsk, evt events.EventSystem) *askEvents {
+       return newAskEventsWithRate(ask, evt, 15*time.Second, 1)
+}
+
+func newAskEventsWithRate(ask *AllocationAsk, evt events.EventSystem, interval 
time.Duration, burst int) *askEvents {
+       return &askEvents{
+               eventSystem: evt,
+               ask:         ask,
+               limiter:     rate.NewLimiter(rate.Every(interval), burst),
+       }
+}
diff --git a/pkg/scheduler/objects/ask_events_test.go 
b/pkg/scheduler/objects/ask_events_test.go
new file mode 100644
index 00000000..cffe4693
--- /dev/null
+++ b/pkg/scheduler/objects/ask_events_test.go
@@ -0,0 +1,161 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package objects
+
+import (
+       "strconv"
+       "testing"
+       "time"
+
+       "gotest.tools/v3/assert"
+
+       "github.com/apache/yunikorn-core/pkg/common/resources"
+       "github.com/apache/yunikorn-scheduler-interface/lib/go/si"
+)
+
+var requestResource = 
resources.NewResourceFromMap(map[string]resources.Quantity{
+       "memory": 100,
+       "cpu":    100,
+})
+
+func TestRequestDoesNotFitInQueueEvent(t *testing.T) {
+       ask := &AllocationAsk{
+               allocationKey:     "alloc-0",
+               applicationID:     "app-0",
+               allocatedResource: requestResource,
+       }
+       eventSystem := newEventSystemMockDisabled()
+       events := newAskEvents(ask, eventSystem)
+       events.sendRequestExceedsQueueHeadroom(getTestResource(), "root.test")
+       assert.Equal(t, 0, len(eventSystem.events))
+
+       eventSystem = newEventSystemMock()
+       events = newAskEvents(ask, eventSystem)
+       events.sendRequestExceedsQueueHeadroom(getTestResource(), "root.test")
+       assert.Equal(t, 1, len(eventSystem.events))
+       event := eventSystem.events[0]
+       assert.Equal(t, "alloc-0", event.ObjectID)
+       assert.Equal(t, "app-0", event.ReferenceID)
+       assert.Equal(t, si.EventRecord_REQUEST, event.Type)
+       assert.Equal(t, si.EventRecord_NONE, event.EventChangeType)
+       assert.Equal(t, si.EventRecord_DETAILS_NONE, event.EventChangeDetail)
+       assert.Equal(t, "Request 'alloc-0' does not fit in queue 'root.test' 
(requested map[cpu:100 memory:100], available map[cpu:1])", event.Message)
+}
+
+func TestRequestFitsInQueueEvent(t *testing.T) {
+       ask := &AllocationAsk{
+               allocationKey:     "alloc-0",
+               applicationID:     "app-0",
+               allocatedResource: requestResource,
+       }
+       eventSystem := newEventSystemMockDisabled()
+       events := newAskEvents(ask, eventSystem)
+       events.sendRequestFitsInQueue("root.test")
+       assert.Equal(t, 0, len(eventSystem.events))
+
+       eventSystem = newEventSystemMock()
+       events = newAskEvents(ask, eventSystem)
+       events.sendRequestFitsInQueue("root.test")
+       assert.Equal(t, 1, len(eventSystem.events))
+       event := eventSystem.events[0]
+       assert.Equal(t, "alloc-0", event.ObjectID)
+       assert.Equal(t, "app-0", event.ReferenceID)
+       assert.Equal(t, si.EventRecord_REQUEST, event.Type)
+       assert.Equal(t, si.EventRecord_NONE, event.EventChangeType)
+       assert.Equal(t, si.EventRecord_DETAILS_NONE, event.EventChangeDetail)
+       assert.Equal(t, "Request 'alloc-0' has become schedulable in queue 
'root.test'", event.Message)
+}
+
+func TestRequestExceedsUserQuotaEvent(t *testing.T) {
+       ask := &AllocationAsk{
+               allocationKey:     "alloc-0",
+               applicationID:     "app-0",
+               allocatedResource: requestResource,
+       }
+       eventSystem := newEventSystemMockDisabled()
+       events := newAskEvents(ask, eventSystem)
+       events.sendRequestExceedsUserQuota(getTestResource())
+       assert.Equal(t, 0, len(eventSystem.events))
+
+       eventSystem = newEventSystemMock()
+       events = newAskEvents(ask, eventSystem)
+       events.sendRequestExceedsUserQuota(getTestResource())
+       assert.Equal(t, 1, len(eventSystem.events))
+       event := eventSystem.events[0]
+       assert.Equal(t, "alloc-0", event.ObjectID)
+       assert.Equal(t, "app-0", event.ReferenceID)
+       assert.Equal(t, si.EventRecord_REQUEST, event.Type)
+       assert.Equal(t, si.EventRecord_NONE, event.EventChangeType)
+       assert.Equal(t, si.EventRecord_DETAILS_NONE, event.EventChangeDetail)
+       assert.Equal(t, "Request 'alloc-0' exceeds the available user quota 
(requested map[cpu:100 memory:100], available map[cpu:1])", event.Message)
+}
+
+func TestRequestFitsInUserQuotaEvent(t *testing.T) {
+       ask := &AllocationAsk{
+               allocationKey:     "alloc-0",
+               applicationID:     "app-0",
+               allocatedResource: requestResource,
+       }
+       eventSystem := newEventSystemMockDisabled()
+       events := newAskEvents(ask, eventSystem)
+       events.sendRequestFitsInUserQuota()
+       assert.Equal(t, 0, len(eventSystem.events))
+
+       eventSystem = newEventSystemMock()
+       events = newAskEvents(ask, eventSystem)
+       events.sendRequestFitsInUserQuota()
+       assert.Equal(t, 1, len(eventSystem.events))
+       event := eventSystem.events[0]
+       assert.Equal(t, "alloc-0", event.ObjectID)
+       assert.Equal(t, "app-0", event.ReferenceID)
+       assert.Equal(t, si.EventRecord_REQUEST, event.Type)
+       assert.Equal(t, si.EventRecord_NONE, event.EventChangeType)
+       assert.Equal(t, si.EventRecord_DETAILS_NONE, event.EventChangeDetail)
+       assert.Equal(t, "Request 'alloc-0' fits in the available user quota", 
event.Message)
+}
+
+func TestPredicateFailedEvents(t *testing.T) {
+       ask := &AllocationAsk{
+               allocationKey:     "alloc-0",
+               applicationID:     "app-0",
+               allocatedResource: requestResource,
+       }
+       eventSystem := newEventSystemMockDisabled()
+       events := newAskEvents(ask, eventSystem)
+       events.sendPredicateFailed("failed")
+       assert.Equal(t, 0, len(eventSystem.events))
+
+       eventSystem = newEventSystemMock()
+       events = newAskEventsWithRate(ask, eventSystem, 50*time.Millisecond, 1)
+       // only the first event is expected to be emitted due to rate limiting
+       for i := 0; i < 200; i++ {
+               events.sendPredicateFailed("failure-" + 
strconv.FormatUint(uint64(i), 10))
+       }
+       assert.Equal(t, 1, len(eventSystem.events))
+       event := eventSystem.events[0]
+       assert.Equal(t, "Predicate failed for request 'alloc-0' with message: 
'failure-0'", event.Message)
+
+       eventSystem.Reset()
+       // wait a bit, a new event is expected
+       time.Sleep(100 * time.Millisecond)
+       events.sendPredicateFailed("failed")
+       assert.Equal(t, 1, len(eventSystem.events))
+       event = eventSystem.events[0]
+       assert.Equal(t, "Predicate failed for request 'alloc-0' with message: 
'failed'", event.Message)
+}
diff --git a/pkg/scheduler/objects/node.go b/pkg/scheduler/objects/node.go
index 12183a15..724880f7 100644
--- a/pkg/scheduler/objects/node.go
+++ b/pkg/scheduler/objects/node.go
@@ -406,7 +406,9 @@ func (sn *Node) preConditions(ask *AllocationAsk, allocate 
bool) bool {
                                zap.Bool("allocateFlag", allocate),
                                zap.Error(err))
                        // running predicates failed
-                       ask.LogAllocationFailure(err.Error(), allocate)
+                       msg := err.Error()
+                       ask.LogAllocationFailure(msg, allocate)
+                       ask.SendPredicateFailedEvent(msg)
                        return false
                }
        }
diff --git a/pkg/scheduler/objects/node_test.go 
b/pkg/scheduler/objects/node_test.go
index 3e99dbd5..e5ce3e4a 100644
--- a/pkg/scheduler/objects/node_test.go
+++ b/pkg/scheduler/objects/node_test.go
@@ -25,6 +25,8 @@ import (
        "gotest.tools/v3/assert"
 
        "github.com/apache/yunikorn-core/pkg/common/resources"
+       "github.com/apache/yunikorn-core/pkg/mock"
+       "github.com/apache/yunikorn-core/pkg/plugins"
        "github.com/apache/yunikorn-scheduler-interface/lib/go/common"
        "github.com/apache/yunikorn-scheduler-interface/lib/go/si"
 )
@@ -806,3 +808,36 @@ func TestNode_FitInNode(t *testing.T) {
                })
        }
 }
+
+func TestPreconditions(t *testing.T) {
+       current := plugins.GetResourceManagerCallbackPlugin()
+       defer func() {
+               plugins.RegisterSchedulerPlugin(current)
+       }()
+
+       plugins.RegisterSchedulerPlugin(mock.NewPredicatePlugin(true, 
map[string]int{}))
+       total := 
resources.NewResourceFromMap(map[string]resources.Quantity{"cpu": 100, 
"memory": 100})
+       occupied := 
resources.NewResourceFromMap(map[string]resources.Quantity{"cpu": 10, "memory": 
10})
+       proto := newProto(testNode, total, occupied, map[string]string{
+               "ready": "true",
+       })
+       res := 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 1})
+       ask := newAllocationAsk("test", "app001", res)
+       eventSystem := newEventSystemMock()
+       ask.askEvents = newAskEvents(ask, eventSystem)
+       node := NewNode(proto)
+
+       // failure
+       node.preConditions(ask, true)
+       assert.Equal(t, 1, len(eventSystem.events))
+       assert.Equal(t, "Predicate failed for request 'test' with message: 
'fake predicate plugin failed'", eventSystem.events[0].Message)
+       assert.Equal(t, 1, len(ask.allocLog))
+       assert.Equal(t, "fake predicate plugin failed", ask.allocLog["fake 
predicate plugin failed"].Message)
+
+       // pass
+       eventSystem.Reset()
+       plugins.RegisterSchedulerPlugin(mock.NewPredicatePlugin(false, 
map[string]int{}))
+       node.preConditions(ask, true)
+       assert.Equal(t, 0, len(eventSystem.events))
+       assert.Equal(t, 1, len(ask.allocLog))
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to