This is an automated email from the ASF dual-hosted git repository.
pbacsko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git
The following commit(s) were added to refs/heads/master by this push:
new dc4f6efd [YUNIKORN-2370] Proper event handling for failed headroom
checks (#784)
dc4f6efd is described below
commit dc4f6efddce376e103314325a7e9236924c83b4e
Author: Peter Bacsko <[email protected]>
AuthorDate: Wed Feb 7 11:46:28 2024 +0100
[YUNIKORN-2370] Proper event handling for failed headroom checks (#784)
Closes: #784
Signed-off-by: Peter Bacsko <[email protected]>
---
pkg/scheduler/objects/allocation_ask.go | 47 +++++++
pkg/scheduler/objects/allocation_ask_test.go | 22 ++++
pkg/scheduler/objects/application.go | 6 +-
pkg/scheduler/objects/application_events.go | 10 --
pkg/scheduler/objects/application_events_test.go | 44 +------
pkg/scheduler/objects/application_test.go | 128 ++++++++++++++----
pkg/scheduler/objects/ask_events.go | 93 +++++++++++++
pkg/scheduler/objects/ask_events_test.go | 161 +++++++++++++++++++++++
pkg/scheduler/objects/node.go | 4 +-
pkg/scheduler/objects/node_test.go | 35 +++++
10 files changed, 471 insertions(+), 79 deletions(-)
diff --git a/pkg/scheduler/objects/allocation_ask.go
b/pkg/scheduler/objects/allocation_ask.go
index 7a42d04c..181e7f57 100644
--- a/pkg/scheduler/objects/allocation_ask.go
+++ b/pkg/scheduler/objects/allocation_ask.go
@@ -27,6 +27,7 @@ import (
"github.com/apache/yunikorn-core/pkg/common"
"github.com/apache/yunikorn-core/pkg/common/resources"
+ "github.com/apache/yunikorn-core/pkg/events"
"github.com/apache/yunikorn-core/pkg/log"
"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
)
@@ -59,6 +60,10 @@ type AllocationAsk struct {
scaleUpTriggered bool // whether this ask has triggered
autoscaling or not
resKeyPerNode map[string]string // reservation key for a given
node
+ askEvents *askEvents
+ userQuotaCheckFailed bool
+ headroomCheckFailed bool
+
sync.RWMutex
}
@@ -77,6 +82,7 @@ func NewAllocationAsk(allocationKey string, applicationID
string, allocatedResou
resKeyPerNode: make(map[string]string),
}
aa.resKeyWithoutNode = reservationKeyWithoutNode(applicationID,
allocationKey)
+ aa.askEvents = newAskEvents(aa, events.GetEventSystem())
return aa
}
@@ -110,6 +116,7 @@ func NewAllocationAskFromSI(ask *si.AllocationAsk)
*AllocationAsk {
return nil
}
saa.resKeyWithoutNode = reservationKeyWithoutNode(ask.ApplicationID,
ask.AllocationKey)
+ saa.askEvents = newAskEvents(saa, events.GetEventSystem())
return saa
}
@@ -261,6 +268,10 @@ func (aa *AllocationAsk) LogAllocationFailure(message
string, allocate bool) {
entry.Count++
}
+func (aa *AllocationAsk) SendPredicateFailedEvent(message string) {
+ aa.askEvents.sendPredicateFailed(message)
+}
+
// GetAllocationLog returns a list of log entries corresponding to allocation
preconditions not being met
func (aa *AllocationAsk) GetAllocationLog() []*AllocationLogEntry {
aa.RLock()
@@ -343,3 +354,39 @@ func (aa *AllocationAsk) getReservationKeyForNode(node
string) string {
defer aa.RUnlock()
return aa.resKeyPerNode[node]
}
+
+func (aa *AllocationAsk) setHeadroomCheckFailed(headroom *resources.Resource,
queue string) {
+ aa.Lock()
+ defer aa.Unlock()
+ if !aa.headroomCheckFailed {
+ aa.headroomCheckFailed = true
+ aa.askEvents.sendRequestExceedsQueueHeadroom(headroom, queue)
+ }
+}
+
+func (aa *AllocationAsk) setHeadroomCheckPassed(queue string) {
+ aa.Lock()
+ defer aa.Unlock()
+ if aa.headroomCheckFailed {
+ aa.headroomCheckFailed = false
+ aa.askEvents.sendRequestFitsInQueue(queue)
+ }
+}
+
+func (aa *AllocationAsk) setUserQuotaCheckFailed(available
*resources.Resource) {
+ aa.Lock()
+ defer aa.Unlock()
+ if !aa.userQuotaCheckFailed {
+ aa.userQuotaCheckFailed = true
+ aa.askEvents.sendRequestExceedsUserQuota(available)
+ }
+}
+
+func (aa *AllocationAsk) setUserQuotaCheckPassed() {
+ aa.Lock()
+ defer aa.Unlock()
+ if aa.userQuotaCheckFailed {
+ aa.userQuotaCheckFailed = false
+ aa.askEvents.sendRequestFitsInUserQuota()
+ }
+}
diff --git a/pkg/scheduler/objects/allocation_ask_test.go
b/pkg/scheduler/objects/allocation_ask_test.go
index df49e87d..e10e925e 100644
--- a/pkg/scheduler/objects/allocation_ask_test.go
+++ b/pkg/scheduler/objects/allocation_ask_test.go
@@ -229,6 +229,28 @@ func TestAllocationLog(t *testing.T) {
assert.Equal(t, 1, int(log[1].Count), "wrong count for event 2")
}
+func TestSendPredicateFailed(t *testing.T) {
+ res :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10})
+ siAsk := &si.AllocationAsk{
+ AllocationKey: "ask-1",
+ ApplicationID: "app-1",
+ MaxAllocations: 1,
+ ResourceAsk: res.ToProto(),
+ }
+ ask := NewAllocationAskFromSI(siAsk)
+ eventSystem := newEventSystemMockDisabled()
+ ask.askEvents = newAskEvents(ask, eventSystem)
+ ask.SendPredicateFailedEvent("failed")
+ assert.Equal(t, 0, len(eventSystem.events))
+
+ eventSystem = newEventSystemMock()
+ ask.askEvents = newAskEvents(ask, eventSystem)
+ ask.SendPredicateFailedEvent("failure")
+ assert.Equal(t, 1, len(eventSystem.events))
+ event := eventSystem.events[0]
+ assert.Equal(t, "Predicate failed for request 'ask-1' with message:
'failure'", event.Message)
+}
+
func sortedLog(ask *AllocationAsk) []*AllocationLogEntry {
log := ask.GetAllocationLog()
sort.SliceStable(log, func(i int, j int) bool {
diff --git a/pkg/scheduler/objects/application.go
b/pkg/scheduler/objects/application.go
index 17fb264a..d26634da 100644
--- a/pkg/scheduler/objects/application.go
+++ b/pkg/scheduler/objects/application.go
@@ -955,9 +955,10 @@ func (sa *Application) tryAllocate(headRoom
*resources.Resource, allowPreemption
// as the preempted allocation must be for the same user in a
different queue in the hierarchy...
if !userHeadroom.FitInMaxUndef(request.GetAllocatedResource()) {
request.LogAllocationFailure(NotEnoughUserQuota, true)
// error message MUST be constant!
+ request.setUserQuotaCheckFailed(userHeadroom)
continue
}
-
+ request.setUserQuotaCheckPassed()
request.SetSchedulingAttempted(true)
// resource must fit in headroom otherwise skip the request
(unless preemption could help)
@@ -973,10 +974,11 @@ func (sa *Application) tryAllocate(headRoom
*resources.Resource, allowPreemption
}
}
}
- sa.appEvents.sendAppDoesNotFitEvent(request, headRoom)
request.LogAllocationFailure(NotEnoughQueueQuota, true)
// error message MUST be constant!
+ request.setHeadroomCheckFailed(headRoom, sa.queuePath)
continue
}
+ request.setHeadroomCheckPassed(sa.queuePath)
requiredNode := request.GetRequiredNode()
// does request have any constraint to run on specific node?
diff --git a/pkg/scheduler/objects/application_events.go
b/pkg/scheduler/objects/application_events.go
index 6f209826..87b0a7b3 100644
--- a/pkg/scheduler/objects/application_events.go
+++ b/pkg/scheduler/objects/application_events.go
@@ -25,7 +25,6 @@ import (
"golang.org/x/time/rate"
"github.com/apache/yunikorn-core/pkg/common"
- "github.com/apache/yunikorn-core/pkg/common/resources"
"github.com/apache/yunikorn-core/pkg/events"
"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
)
@@ -36,15 +35,6 @@ type applicationEvents struct {
limiter *rate.Limiter
}
-func (evt *applicationEvents) sendAppDoesNotFitEvent(request *AllocationAsk,
headroom *resources.Resource) {
- if !evt.eventSystem.IsEventTrackingEnabled() || !evt.limiter.Allow() {
- return
- }
- message := fmt.Sprintf("Application %s does not fit into %s queue
(request resoure %s, headroom %s)", request.GetApplicationID(),
evt.app.queuePath, request.GetAllocatedResource(), headroom)
- event := events.CreateRequestEventRecord(request.GetAllocationKey(),
request.GetApplicationID(), message, request.GetAllocatedResource())
- evt.eventSystem.AddEvent(event)
-}
-
func (evt *applicationEvents) sendPlaceholderLargerEvent(ph *Allocation,
request *AllocationAsk) {
if !evt.eventSystem.IsEventTrackingEnabled() {
return
diff --git a/pkg/scheduler/objects/application_events_test.go
b/pkg/scheduler/objects/application_events_test.go
index 827ccd02..c4d6d0e4 100644
--- a/pkg/scheduler/objects/application_events_test.go
+++ b/pkg/scheduler/objects/application_events_test.go
@@ -19,13 +19,10 @@
package objects
import (
- "testing"
- "time"
-
"gotest.tools/v3/assert"
+ "testing"
"github.com/apache/yunikorn-core/pkg/common"
- "github.com/apache/yunikorn-core/pkg/common/resources"
"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
)
@@ -50,45 +47,6 @@ func isStateChangeEvent(t *testing.T, app *Application,
changeDetail si.EventRec
assert.Equal(t, changeDetail, record.EventChangeDetail, "incorrect
change detail")
}
-func TestSendAppDoesNotFitEvent(t *testing.T) {
- app := &Application{
- queuePath: "root.test",
- }
- mock := newEventSystemMockDisabled()
- appEvents := newApplicationEvents(app, mock)
- appEvents.sendAppDoesNotFitEvent(&AllocationAsk{},
&resources.Resource{})
- assert.Equal(t, 0, len(mock.events), "unexpected event")
-
- mock = newEventSystemMock()
- appEvents = newApplicationEvents(app, mock)
- appEvents.sendAppDoesNotFitEvent(&AllocationAsk{
- applicationID: appID0,
- allocationKey: aKey,
- }, &resources.Resource{})
- assert.Equal(t, 1, len(mock.events), "event was not generated")
-}
-
-func TestSendAppDoesNotFitEventWithRateLimiter(t *testing.T) {
- app := &Application{
- queuePath: "root.test",
- }
- mock := newEventSystemMock()
- appEvents := newApplicationEvents(app, mock)
- startTime := time.Now()
- for {
- elapsed := time.Since(startTime)
- if elapsed > 500*time.Millisecond {
- break
- }
- appEvents.sendAppDoesNotFitEvent(&AllocationAsk{
- applicationID: appID0,
- allocationKey: aKey,
- }, &resources.Resource{})
- time.Sleep(10 * time.Millisecond)
- }
- assert.Equal(t, 1, len(mock.events), "event was not generated")
-}
-
func TestSendPlaceholderLargerEvent(t *testing.T) {
app := &Application{
queuePath: "root.test",
diff --git a/pkg/scheduler/objects/application_test.go
b/pkg/scheduler/objects/application_test.go
index b8286ff2..742f3ac8 100644
--- a/pkg/scheduler/objects/application_test.go
+++ b/pkg/scheduler/objects/application_test.go
@@ -2258,19 +2258,15 @@ func TestPlaceholderLargerEvent(t *testing.T) {
assert.Equal(t, "app-1", records[3].ReferenceID)
}
-func TestAppDoesNotFitEvent(t *testing.T) {
- resMap := map[string]string{"memory": "100", "vcores": "10"}
- res, err := resources.NewResourceFromConf(resMap)
- assert.NilError(t, err, "failed to create resource with error")
- headroomMap := map[string]string{"memory": "0", "vcores": "0"}
- headroom, err := resources.NewResourceFromConf(headroomMap)
- assert.NilError(t, err, "failed to create resource with error")
+func TestRequestDoesNotFitQueueEvents(t *testing.T) {
+ res, err := resources.NewResourceFromConf(map[string]string{"memory":
"100", "vcores": "10"})
+ assert.NilError(t, err)
+ headroom, err :=
resources.NewResourceFromConf(map[string]string{"memory": "0", "vcores": "0"})
+ assert.NilError(t, err)
ask := newAllocationAsk("alloc-0", "app-1", res)
app := newApplication(appID1, "default", "root.default")
- // Create event system after new application to avoid new application
event.
- events.Init()
- eventSystem := events.GetEventSystem().(*events.EventSystemImpl)
//nolint:errcheck
- eventSystem.StartServiceWithPublisher(false)
+ eventSystem := newEventSystemMock()
+ ask.askEvents = newAskEvents(ask, eventSystem)
app.disableStateChangeEvents()
app.resetAppEvents()
queue, err := createRootQueue(nil)
@@ -2281,20 +2277,106 @@ func TestAppDoesNotFitEvent(t *testing.T) {
app.sortedRequests = sr
attempts := 0
+ // try to allocate
+ app.tryAllocate(headroom, true, time.Second, &attempts,
nilNodeIterator, nilNodeIterator, nilGetNode)
+ assert.Equal(t, 1, len(eventSystem.events))
+ event := eventSystem.events[0]
+ assert.Equal(t, si.EventRecord_REQUEST, event.Type)
+ assert.Equal(t, si.EventRecord_NONE, event.EventChangeType)
+ assert.Equal(t, si.EventRecord_DETAILS_NONE, event.EventChangeDetail)
+ assert.Equal(t, "app-1", event.ReferenceID)
+ assert.Equal(t, "alloc-0", event.ObjectID)
+ assert.Equal(t, "Request 'alloc-0' does not fit in queue 'root.default'
(requested map[memory:100 vcores:10], available map[memory:0 vcores:0])",
event.Message)
+
+ // second attempt - no new event
app.tryAllocate(headroom, true, time.Second, &attempts,
nilNodeIterator, nilNodeIterator, nilGetNode)
+ assert.Equal(t, 1, len(eventSystem.events))
- noEvents := 0
- err = common.WaitFor(10*time.Millisecond, time.Second, func() bool {
- noEvents = eventSystem.Store.CountStoredEvents()
- return noEvents == 2
- })
- assert.NilError(t, err, "expected 2 event, got %d", noEvents)
- records := eventSystem.Store.CollectEvents()
- assert.Equal(t, si.EventRecord_REQUEST, records[1].Type)
- assert.Equal(t, si.EventRecord_NONE, records[1].EventChangeType)
- assert.Equal(t, si.EventRecord_DETAILS_NONE,
records[1].EventChangeDetail)
- assert.Equal(t, "app-1", records[1].ReferenceID)
- assert.Equal(t, "alloc-0", records[1].ObjectID)
+ // third attempt with enough headroom - new event
+ eventSystem.Reset()
+ headroom, err =
resources.NewResourceFromConf(map[string]string{"memory": "1000", "vcores":
"1000"})
+ assert.NilError(t, err)
+ app.tryAllocate(headroom, true, time.Second, &attempts,
nilNodeIterator, nilNodeIterator, nilGetNode)
+ assert.Equal(t, 1, len(eventSystem.events))
+ event = eventSystem.events[0]
+ assert.Equal(t, si.EventRecord_REQUEST, event.Type)
+ assert.Equal(t, si.EventRecord_NONE, event.EventChangeType)
+ assert.Equal(t, si.EventRecord_DETAILS_NONE, event.EventChangeDetail)
+ assert.Equal(t, "app-1", event.ReferenceID)
+ assert.Equal(t, "alloc-0", event.ObjectID)
+ assert.Equal(t, "Request 'alloc-0' has become schedulable in queue
'root.default'", event.Message)
+}
+
+func TestRequestDoesNotFitUserQuotaQueueEvents(t *testing.T) {
+ setupUGM()
+ // create config with resource limits for "testuser"
+ conf := configs.QueueConfig{
+ Name: "root",
+ Parent: true,
+ SubmitACL: "*",
+ Limits: []configs.Limit{
+ {
+ Limit: "leaf queue limit",
+ Users: []string{
+ "testuser",
+ },
+ MaxResources: map[string]string{
+ "memory": "1",
+ "vcores": "1",
+ },
+ },
+ },
+ }
+ err := ugm.GetUserManager().UpdateConfig(conf, "root")
+ assert.NilError(t, err)
+
+ res, err := resources.NewResourceFromConf(map[string]string{"memory":
"100", "vcores": "10"})
+ assert.NilError(t, err)
+ headroom, err :=
resources.NewResourceFromConf(map[string]string{"memory": "1000", "vcores":
"1000"})
+ assert.NilError(t, err)
+ ask := newAllocationAsk("alloc-0", "app-1", res)
+ app := newApplication(appID1, "default", "root")
+ eventSystem := newEventSystemMock()
+ ask.askEvents = newAskEvents(ask, eventSystem)
+ app.disableStateChangeEvents()
+ app.resetAppEvents()
+ queue, err := createRootQueue(nil)
+ assert.NilError(t, err, "queue create failed")
+ app.queue = queue
+ sr := sortedRequests{}
+ sr.insert(ask)
+ app.sortedRequests = sr
+ attempts := 0
+
+ // try to allocate
+ app.tryAllocate(headroom, true, time.Second, &attempts,
nilNodeIterator, nilNodeIterator, nilGetNode)
+ assert.Equal(t, 1, len(eventSystem.events))
+ event := eventSystem.events[0]
+ assert.Equal(t, si.EventRecord_REQUEST, event.Type)
+ assert.Equal(t, si.EventRecord_NONE, event.EventChangeType)
+ assert.Equal(t, si.EventRecord_DETAILS_NONE, event.EventChangeDetail)
+ assert.Equal(t, "app-1", event.ReferenceID)
+ assert.Equal(t, "alloc-0", event.ObjectID)
+ assert.Equal(t, "Request 'alloc-0' exceeds the available user quota
(requested map[memory:100 vcores:10], available map[memory:1 vcores:1])",
event.Message)
+
+ // second attempt - no new event
+ app.tryAllocate(headroom, true, time.Second, &attempts,
nilNodeIterator, nilNodeIterator, nilGetNode)
+ assert.Equal(t, 1, len(eventSystem.events))
+
+ // third attempt with enough headroom - new event
+ eventSystem.Reset()
+ conf.Limits[0].MaxResources = nil
+ err = ugm.GetUserManager().UpdateConfig(conf, "root")
+ assert.NilError(t, err)
+ app.tryAllocate(headroom, true, time.Second, &attempts,
nilNodeIterator, nilNodeIterator, nilGetNode)
+ assert.Equal(t, 1, len(eventSystem.events))
+ event = eventSystem.events[0]
+ assert.Equal(t, si.EventRecord_REQUEST, event.Type)
+ assert.Equal(t, si.EventRecord_NONE, event.EventChangeType)
+ assert.Equal(t, si.EventRecord_DETAILS_NONE, event.EventChangeDetail)
+ assert.Equal(t, "app-1", event.ReferenceID)
+ assert.Equal(t, "alloc-0", event.ObjectID)
+ assert.Equal(t, "Request 'alloc-0' fits in the available user quota",
event.Message)
}
func TestAllocationFailures(t *testing.T) {
diff --git a/pkg/scheduler/objects/ask_events.go
b/pkg/scheduler/objects/ask_events.go
new file mode 100644
index 00000000..3994ef33
--- /dev/null
+++ b/pkg/scheduler/objects/ask_events.go
@@ -0,0 +1,93 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package objects
+
+import (
+ "fmt"
+ "time"
+
+ "golang.org/x/time/rate"
+
+ "github.com/apache/yunikorn-core/pkg/common/resources"
+ "github.com/apache/yunikorn-core/pkg/events"
+)
+
+// Ask-specific events. These events are of REQUEST type, so they are
eventually sent to the respective pods in K8s.
+type askEvents struct {
+ eventSystem events.EventSystem
+ ask *AllocationAsk
+ limiter *rate.Limiter
+}
+
+func (ae *askEvents) sendRequestExceedsQueueHeadroom(headroom
*resources.Resource, queuePath string) {
+ if !ae.eventSystem.IsEventTrackingEnabled() {
+ return
+ }
+ message := fmt.Sprintf("Request '%s' does not fit in queue '%s'
(requested %s, available %s)", ae.ask.allocationKey, queuePath,
ae.ask.GetAllocatedResource(), headroom)
+ event := events.CreateRequestEventRecord(ae.ask.allocationKey,
ae.ask.applicationID, message, ae.ask.GetAllocatedResource())
+ ae.eventSystem.AddEvent(event)
+}
+
+func (ae *askEvents) sendRequestFitsInQueue(queuePath string) {
+ if !ae.eventSystem.IsEventTrackingEnabled() {
+ return
+ }
+ message := fmt.Sprintf("Request '%s' has become schedulable in queue
'%s'", ae.ask.allocationKey, queuePath)
+ event := events.CreateRequestEventRecord(ae.ask.allocationKey,
ae.ask.applicationID, message, ae.ask.GetAllocatedResource())
+ ae.eventSystem.AddEvent(event)
+}
+
+func (ae *askEvents) sendRequestExceedsUserQuota(headroom *resources.Resource)
{
+ if !ae.eventSystem.IsEventTrackingEnabled() {
+ return
+ }
+ message := fmt.Sprintf("Request '%s' exceeds the available user quota
(requested %s, available %s)", ae.ask.allocationKey,
ae.ask.GetAllocatedResource(), headroom)
+ event := events.CreateRequestEventRecord(ae.ask.allocationKey,
ae.ask.applicationID, message, ae.ask.GetAllocatedResource())
+ ae.eventSystem.AddEvent(event)
+}
+
+func (ae *askEvents) sendRequestFitsInUserQuota() {
+ if !ae.eventSystem.IsEventTrackingEnabled() {
+ return
+ }
+ message := fmt.Sprintf("Request '%s' fits in the available user quota",
ae.ask.allocationKey)
+ event := events.CreateRequestEventRecord(ae.ask.allocationKey,
ae.ask.applicationID, message, ae.ask.GetAllocatedResource())
+ ae.eventSystem.AddEvent(event)
+}
+
+func (ae *askEvents) sendPredicateFailed(predicateMsg string) {
+ if !ae.eventSystem.IsEventTrackingEnabled() || !ae.limiter.Allow() {
+ return
+ }
+ message := fmt.Sprintf("Predicate failed for request '%s' with message:
'%s'", ae.ask.allocationKey, predicateMsg)
+ event := events.CreateRequestEventRecord(ae.ask.allocationKey,
ae.ask.applicationID, message, ae.ask.GetAllocatedResource())
+ ae.eventSystem.AddEvent(event)
+}
+
+func newAskEvents(ask *AllocationAsk, evt events.EventSystem) *askEvents {
+ return newAskEventsWithRate(ask, evt, 15*time.Second, 1)
+}
+
+func newAskEventsWithRate(ask *AllocationAsk, evt events.EventSystem, interval
time.Duration, burst int) *askEvents {
+ return &askEvents{
+ eventSystem: evt,
+ ask: ask,
+ limiter: rate.NewLimiter(rate.Every(interval), burst),
+ }
+}
diff --git a/pkg/scheduler/objects/ask_events_test.go
b/pkg/scheduler/objects/ask_events_test.go
new file mode 100644
index 00000000..cffe4693
--- /dev/null
+++ b/pkg/scheduler/objects/ask_events_test.go
@@ -0,0 +1,161 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package objects
+
+import (
+ "strconv"
+ "testing"
+ "time"
+
+ "gotest.tools/v3/assert"
+
+ "github.com/apache/yunikorn-core/pkg/common/resources"
+ "github.com/apache/yunikorn-scheduler-interface/lib/go/si"
+)
+
+var requestResource =
resources.NewResourceFromMap(map[string]resources.Quantity{
+ "memory": 100,
+ "cpu": 100,
+})
+
+func TestRequestDoesNotFitInQueueEvent(t *testing.T) {
+ ask := &AllocationAsk{
+ allocationKey: "alloc-0",
+ applicationID: "app-0",
+ allocatedResource: requestResource,
+ }
+ eventSystem := newEventSystemMockDisabled()
+ events := newAskEvents(ask, eventSystem)
+ events.sendRequestExceedsQueueHeadroom(getTestResource(), "root.test")
+ assert.Equal(t, 0, len(eventSystem.events))
+
+ eventSystem = newEventSystemMock()
+ events = newAskEvents(ask, eventSystem)
+ events.sendRequestExceedsQueueHeadroom(getTestResource(), "root.test")
+ assert.Equal(t, 1, len(eventSystem.events))
+ event := eventSystem.events[0]
+ assert.Equal(t, "alloc-0", event.ObjectID)
+ assert.Equal(t, "app-0", event.ReferenceID)
+ assert.Equal(t, si.EventRecord_REQUEST, event.Type)
+ assert.Equal(t, si.EventRecord_NONE, event.EventChangeType)
+ assert.Equal(t, si.EventRecord_DETAILS_NONE, event.EventChangeDetail)
+ assert.Equal(t, "Request 'alloc-0' does not fit in queue 'root.test'
(requested map[cpu:100 memory:100], available map[cpu:1])", event.Message)
+}
+
+func TestRequestFitsInQueueEvent(t *testing.T) {
+ ask := &AllocationAsk{
+ allocationKey: "alloc-0",
+ applicationID: "app-0",
+ allocatedResource: requestResource,
+ }
+ eventSystem := newEventSystemMockDisabled()
+ events := newAskEvents(ask, eventSystem)
+ events.sendRequestFitsInQueue("root.test")
+ assert.Equal(t, 0, len(eventSystem.events))
+
+ eventSystem = newEventSystemMock()
+ events = newAskEvents(ask, eventSystem)
+ events.sendRequestFitsInQueue("root.test")
+ assert.Equal(t, 1, len(eventSystem.events))
+ event := eventSystem.events[0]
+ assert.Equal(t, "alloc-0", event.ObjectID)
+ assert.Equal(t, "app-0", event.ReferenceID)
+ assert.Equal(t, si.EventRecord_REQUEST, event.Type)
+ assert.Equal(t, si.EventRecord_NONE, event.EventChangeType)
+ assert.Equal(t, si.EventRecord_DETAILS_NONE, event.EventChangeDetail)
+ assert.Equal(t, "Request 'alloc-0' has become schedulable in queue
'root.test'", event.Message)
+}
+
+func TestRequestExceedsUserQuotaEvent(t *testing.T) {
+ ask := &AllocationAsk{
+ allocationKey: "alloc-0",
+ applicationID: "app-0",
+ allocatedResource: requestResource,
+ }
+ eventSystem := newEventSystemMockDisabled()
+ events := newAskEvents(ask, eventSystem)
+ events.sendRequestExceedsUserQuota(getTestResource())
+ assert.Equal(t, 0, len(eventSystem.events))
+
+ eventSystem = newEventSystemMock()
+ events = newAskEvents(ask, eventSystem)
+ events.sendRequestExceedsUserQuota(getTestResource())
+ assert.Equal(t, 1, len(eventSystem.events))
+ event := eventSystem.events[0]
+ assert.Equal(t, "alloc-0", event.ObjectID)
+ assert.Equal(t, "app-0", event.ReferenceID)
+ assert.Equal(t, si.EventRecord_REQUEST, event.Type)
+ assert.Equal(t, si.EventRecord_NONE, event.EventChangeType)
+ assert.Equal(t, si.EventRecord_DETAILS_NONE, event.EventChangeDetail)
+ assert.Equal(t, "Request 'alloc-0' exceeds the available user quota
(requested map[cpu:100 memory:100], available map[cpu:1])", event.Message)
+}
+
+func TestRequestFitsInUserQuotaEvent(t *testing.T) {
+ ask := &AllocationAsk{
+ allocationKey: "alloc-0",
+ applicationID: "app-0",
+ allocatedResource: requestResource,
+ }
+ eventSystem := newEventSystemMockDisabled()
+ events := newAskEvents(ask, eventSystem)
+ events.sendRequestFitsInUserQuota()
+ assert.Equal(t, 0, len(eventSystem.events))
+
+ eventSystem = newEventSystemMock()
+ events = newAskEvents(ask, eventSystem)
+ events.sendRequestFitsInUserQuota()
+ assert.Equal(t, 1, len(eventSystem.events))
+ event := eventSystem.events[0]
+ assert.Equal(t, "alloc-0", event.ObjectID)
+ assert.Equal(t, "app-0", event.ReferenceID)
+ assert.Equal(t, si.EventRecord_REQUEST, event.Type)
+ assert.Equal(t, si.EventRecord_NONE, event.EventChangeType)
+ assert.Equal(t, si.EventRecord_DETAILS_NONE, event.EventChangeDetail)
+ assert.Equal(t, "Request 'alloc-0' fits in the available user quota",
event.Message)
+}
+
+func TestPredicateFailedEvents(t *testing.T) {
+ ask := &AllocationAsk{
+ allocationKey: "alloc-0",
+ applicationID: "app-0",
+ allocatedResource: requestResource,
+ }
+ eventSystem := newEventSystemMockDisabled()
+ events := newAskEvents(ask, eventSystem)
+ events.sendPredicateFailed("failed")
+ assert.Equal(t, 0, len(eventSystem.events))
+
+ eventSystem = newEventSystemMock()
+ events = newAskEventsWithRate(ask, eventSystem, 50*time.Millisecond, 1)
+ // only the first event is expected to be emitted due to rate limiting
+ for i := 0; i < 200; i++ {
+ events.sendPredicateFailed("failure-" +
strconv.FormatUint(uint64(i), 10))
+ }
+ assert.Equal(t, 1, len(eventSystem.events))
+ event := eventSystem.events[0]
+ assert.Equal(t, "Predicate failed for request 'alloc-0' with message:
'failure-0'", event.Message)
+
+ eventSystem.Reset()
+ // wait a bit, a new event is expected
+ time.Sleep(100 * time.Millisecond)
+ events.sendPredicateFailed("failed")
+ assert.Equal(t, 1, len(eventSystem.events))
+ event = eventSystem.events[0]
+ assert.Equal(t, "Predicate failed for request 'alloc-0' with message:
'failed'", event.Message)
+}
diff --git a/pkg/scheduler/objects/node.go b/pkg/scheduler/objects/node.go
index 12183a15..724880f7 100644
--- a/pkg/scheduler/objects/node.go
+++ b/pkg/scheduler/objects/node.go
@@ -406,7 +406,9 @@ func (sn *Node) preConditions(ask *AllocationAsk, allocate
bool) bool {
zap.Bool("allocateFlag", allocate),
zap.Error(err))
// running predicates failed
- ask.LogAllocationFailure(err.Error(), allocate)
+ msg := err.Error()
+ ask.LogAllocationFailure(msg, allocate)
+ ask.SendPredicateFailedEvent(msg)
return false
}
}
diff --git a/pkg/scheduler/objects/node_test.go
b/pkg/scheduler/objects/node_test.go
index 3e99dbd5..e5ce3e4a 100644
--- a/pkg/scheduler/objects/node_test.go
+++ b/pkg/scheduler/objects/node_test.go
@@ -25,6 +25,8 @@ import (
"gotest.tools/v3/assert"
"github.com/apache/yunikorn-core/pkg/common/resources"
+ "github.com/apache/yunikorn-core/pkg/mock"
+ "github.com/apache/yunikorn-core/pkg/plugins"
"github.com/apache/yunikorn-scheduler-interface/lib/go/common"
"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
)
@@ -806,3 +808,36 @@ func TestNode_FitInNode(t *testing.T) {
})
}
}
+
+func TestPreconditions(t *testing.T) {
+ current := plugins.GetResourceManagerCallbackPlugin()
+ defer func() {
+ plugins.RegisterSchedulerPlugin(current)
+ }()
+
+ plugins.RegisterSchedulerPlugin(mock.NewPredicatePlugin(true,
map[string]int{}))
+ total :=
resources.NewResourceFromMap(map[string]resources.Quantity{"cpu": 100,
"memory": 100})
+ occupied :=
resources.NewResourceFromMap(map[string]resources.Quantity{"cpu": 10, "memory":
10})
+ proto := newProto(testNode, total, occupied, map[string]string{
+ "ready": "true",
+ })
+ res :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 1})
+ ask := newAllocationAsk("test", "app001", res)
+ eventSystem := newEventSystemMock()
+ ask.askEvents = newAskEvents(ask, eventSystem)
+ node := NewNode(proto)
+
+ // failure
+ node.preConditions(ask, true)
+ assert.Equal(t, 1, len(eventSystem.events))
+ assert.Equal(t, "Predicate failed for request 'test' with message:
'fake predicate plugin failed'", eventSystem.events[0].Message)
+ assert.Equal(t, 1, len(ask.allocLog))
+ assert.Equal(t, "fake predicate plugin failed", ask.allocLog["fake
predicate plugin failed"].Message)
+
+ // pass
+ eventSystem.Reset()
+ plugins.RegisterSchedulerPlugin(mock.NewPredicatePlugin(false,
map[string]int{}))
+ node.preConditions(ask, true)
+ assert.Equal(t, 0, len(eventSystem.events))
+ assert.Equal(t, 1, len(ask.allocLog))
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]