This is an automated email from the ASF dual-hosted git repository.
pbacsko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git
The following commit(s) were added to refs/heads/master by this push:
new b0721e24 [YUNIKORN-2566] Remove AllocationAsk reference from askEvents
(#868)
b0721e24 is described below
commit b0721e242b4936c4801aea3e885f8e796d5f4f89
Author: Peter Bacsko <[email protected]>
AuthorDate: Thu May 23 12:31:16 2024 +0200
[YUNIKORN-2566] Remove AllocationAsk reference from askEvents (#868)
Closes: #868
Signed-off-by: Peter Bacsko <[email protected]>
---
pkg/scheduler/objects/allocation_ask.go | 14 ++---
pkg/scheduler/objects/allocation_ask_test.go | 4 +-
pkg/scheduler/objects/application_test.go | 4 +-
pkg/scheduler/objects/ask_events.go | 38 +++++++------
pkg/scheduler/objects/ask_events_test.go | 80 +++++++++++-----------------
pkg/scheduler/objects/node_test.go | 2 +-
6 files changed, 60 insertions(+), 82 deletions(-)
diff --git a/pkg/scheduler/objects/allocation_ask.go
b/pkg/scheduler/objects/allocation_ask.go
index 4701beae..e213a1b6 100644
--- a/pkg/scheduler/objects/allocation_ask.go
+++ b/pkg/scheduler/objects/allocation_ask.go
@@ -77,9 +77,9 @@ func NewAllocationAsk(allocationKey string, applicationID
string, allocatedResou
allocatedResource: allocatedResource,
allocLog: make(map[string]*AllocationLogEntry),
resKeyPerNode: make(map[string]string),
+ askEvents: newAskEvents(events.GetEventSystem()),
}
aa.resKeyWithoutNode = reservationKeyWithoutNode(applicationID,
allocationKey)
- aa.askEvents = newAskEvents(aa, events.GetEventSystem())
return aa
}
@@ -99,6 +99,7 @@ func NewAllocationAskFromSI(ask *si.AllocationAsk)
*AllocationAsk {
originator: ask.Originator,
allocLog: make(map[string]*AllocationLogEntry),
resKeyPerNode: make(map[string]string),
+ askEvents: newAskEvents(events.GetEventSystem()),
}
// this is a safety check placeholder and task group name must be set
as a combo
// order is important as task group can be set without placeholder but
not the other way around
@@ -108,7 +109,6 @@ func NewAllocationAskFromSI(ask *si.AllocationAsk)
*AllocationAsk {
return nil
}
saa.resKeyWithoutNode = reservationKeyWithoutNode(ask.ApplicationID,
ask.AllocationKey)
- saa.askEvents = newAskEvents(saa, events.GetEventSystem())
return saa
}
@@ -260,7 +260,7 @@ func (aa *AllocationAsk) LogAllocationFailure(message
string, allocate bool) {
}
func (aa *AllocationAsk) SendPredicateFailedEvent(message string) {
- aa.askEvents.sendPredicateFailed(message)
+ aa.askEvents.sendPredicateFailed(aa.allocationKey, aa.applicationID,
message, aa.GetAllocatedResource())
}
// GetAllocationLog returns a list of log entries corresponding to allocation
preconditions not being met
@@ -344,7 +344,7 @@ func (aa *AllocationAsk) setHeadroomCheckFailed(headroom
*resources.Resource, qu
defer aa.Unlock()
if !aa.headroomCheckFailed {
aa.headroomCheckFailed = true
- aa.askEvents.sendRequestExceedsQueueHeadroom(headroom, queue)
+ aa.askEvents.sendRequestExceedsQueueHeadroom(aa.allocationKey,
aa.applicationID, headroom, aa.allocatedResource, queue)
}
}
@@ -353,7 +353,7 @@ func (aa *AllocationAsk) setHeadroomCheckPassed(queue
string) {
defer aa.Unlock()
if aa.headroomCheckFailed {
aa.headroomCheckFailed = false
- aa.askEvents.sendRequestFitsInQueue(queue)
+ aa.askEvents.sendRequestFitsInQueue(aa.allocationKey,
aa.applicationID, queue, aa.allocatedResource)
}
}
@@ -362,7 +362,7 @@ func (aa *AllocationAsk) setUserQuotaCheckFailed(available
*resources.Resource)
defer aa.Unlock()
if !aa.userQuotaCheckFailed {
aa.userQuotaCheckFailed = true
- aa.askEvents.sendRequestExceedsUserQuota(available)
+ aa.askEvents.sendRequestExceedsUserQuota(aa.allocationKey,
aa.applicationID, available, aa.allocatedResource)
}
}
@@ -371,6 +371,6 @@ func (aa *AllocationAsk) setUserQuotaCheckPassed() {
defer aa.Unlock()
if aa.userQuotaCheckFailed {
aa.userQuotaCheckFailed = false
- aa.askEvents.sendRequestFitsInUserQuota()
+ aa.askEvents.sendRequestFitsInUserQuota(aa.allocationKey,
aa.applicationID, aa.allocatedResource)
}
}
diff --git a/pkg/scheduler/objects/allocation_ask_test.go
b/pkg/scheduler/objects/allocation_ask_test.go
index b2667c7d..a9739754 100644
--- a/pkg/scheduler/objects/allocation_ask_test.go
+++ b/pkg/scheduler/objects/allocation_ask_test.go
@@ -213,12 +213,12 @@ func TestSendPredicateFailed(t *testing.T) {
}
ask := NewAllocationAskFromSI(siAsk)
eventSystem := mock.NewEventSystemDisabled()
- ask.askEvents = newAskEvents(ask, eventSystem)
+ ask.askEvents = newAskEvents(eventSystem)
ask.SendPredicateFailedEvent("failed")
assert.Equal(t, 0, len(eventSystem.Events))
eventSystem = mock.NewEventSystem()
- ask.askEvents = newAskEvents(ask, eventSystem)
+ ask.askEvents = newAskEvents(eventSystem)
ask.SendPredicateFailedEvent("failure")
assert.Equal(t, 1, len(eventSystem.Events))
event := eventSystem.Events[0]
diff --git a/pkg/scheduler/objects/application_test.go
b/pkg/scheduler/objects/application_test.go
index eab5ae5b..82939359 100644
--- a/pkg/scheduler/objects/application_test.go
+++ b/pkg/scheduler/objects/application_test.go
@@ -2247,7 +2247,7 @@ func TestRequestDoesNotFitQueueEvents(t *testing.T) {
ask := newAllocationAsk("alloc-0", "app-1", res)
app := newApplication(appID1, "default", "root.default")
eventSystem := mock.NewEventSystem()
- ask.askEvents = newAskEvents(ask, eventSystem)
+ ask.askEvents = newAskEvents(eventSystem)
app.disableStateChangeEvents()
app.resetAppEvents()
queue, err := createRootQueue(nil)
@@ -2318,7 +2318,7 @@ func TestRequestDoesNotFitUserQuotaQueueEvents(t
*testing.T) {
ask := newAllocationAsk("alloc-0", "app-1", res)
app := newApplication(appID1, "default", "root")
eventSystem := mock.NewEventSystem()
- ask.askEvents = newAskEvents(ask, eventSystem)
+ ask.askEvents = newAskEvents(eventSystem)
app.disableStateChangeEvents()
app.resetAppEvents()
queue, err := createRootQueue(nil)
diff --git a/pkg/scheduler/objects/ask_events.go
b/pkg/scheduler/objects/ask_events.go
index 3994ef33..755374c6 100644
--- a/pkg/scheduler/objects/ask_events.go
+++ b/pkg/scheduler/objects/ask_events.go
@@ -31,63 +31,61 @@ import (
// Ask-specific events. These events are of REQUEST type, so they are
eventually sent to the respective pods in K8s.
type askEvents struct {
eventSystem events.EventSystem
- ask *AllocationAsk
limiter *rate.Limiter
}
-func (ae *askEvents) sendRequestExceedsQueueHeadroom(headroom
*resources.Resource, queuePath string) {
+func (ae *askEvents) sendRequestExceedsQueueHeadroom(allocKey, appID string,
headroom, allocatedResource *resources.Resource, queuePath string) {
if !ae.eventSystem.IsEventTrackingEnabled() {
return
}
- message := fmt.Sprintf("Request '%s' does not fit in queue '%s'
(requested %s, available %s)", ae.ask.allocationKey, queuePath,
ae.ask.GetAllocatedResource(), headroom)
- event := events.CreateRequestEventRecord(ae.ask.allocationKey,
ae.ask.applicationID, message, ae.ask.GetAllocatedResource())
+ message := fmt.Sprintf("Request '%s' does not fit in queue '%s'
(requested %s, available %s)", allocKey, queuePath, allocatedResource, headroom)
+ event := events.CreateRequestEventRecord(allocKey, appID, message,
allocatedResource)
ae.eventSystem.AddEvent(event)
}
-func (ae *askEvents) sendRequestFitsInQueue(queuePath string) {
+func (ae *askEvents) sendRequestFitsInQueue(allocKey, appID, queuePath string,
allocatedResource *resources.Resource) {
if !ae.eventSystem.IsEventTrackingEnabled() {
return
}
- message := fmt.Sprintf("Request '%s' has become schedulable in queue
'%s'", ae.ask.allocationKey, queuePath)
- event := events.CreateRequestEventRecord(ae.ask.allocationKey,
ae.ask.applicationID, message, ae.ask.GetAllocatedResource())
+ message := fmt.Sprintf("Request '%s' has become schedulable in queue
'%s'", allocKey, queuePath)
+ event := events.CreateRequestEventRecord(allocKey, appID, message,
allocatedResource)
ae.eventSystem.AddEvent(event)
}
-func (ae *askEvents) sendRequestExceedsUserQuota(headroom *resources.Resource)
{
+func (ae *askEvents) sendRequestExceedsUserQuota(allocKey, appID string,
headroom, allocatedResource *resources.Resource) {
if !ae.eventSystem.IsEventTrackingEnabled() {
return
}
- message := fmt.Sprintf("Request '%s' exceeds the available user quota
(requested %s, available %s)", ae.ask.allocationKey,
ae.ask.GetAllocatedResource(), headroom)
- event := events.CreateRequestEventRecord(ae.ask.allocationKey,
ae.ask.applicationID, message, ae.ask.GetAllocatedResource())
+ message := fmt.Sprintf("Request '%s' exceeds the available user quota
(requested %s, available %s)", allocKey, allocatedResource, headroom)
+ event := events.CreateRequestEventRecord(allocKey, appID, message,
allocatedResource)
ae.eventSystem.AddEvent(event)
}
-func (ae *askEvents) sendRequestFitsInUserQuota() {
+func (ae *askEvents) sendRequestFitsInUserQuota(allocKey, appID string,
allocatedResource *resources.Resource) {
if !ae.eventSystem.IsEventTrackingEnabled() {
return
}
- message := fmt.Sprintf("Request '%s' fits in the available user quota",
ae.ask.allocationKey)
- event := events.CreateRequestEventRecord(ae.ask.allocationKey,
ae.ask.applicationID, message, ae.ask.GetAllocatedResource())
+ message := fmt.Sprintf("Request '%s' fits in the available user quota",
allocKey)
+ event := events.CreateRequestEventRecord(allocKey, appID, message,
allocatedResource)
ae.eventSystem.AddEvent(event)
}
-func (ae *askEvents) sendPredicateFailed(predicateMsg string) {
+func (ae *askEvents) sendPredicateFailed(allocKey, appID, predicateMsg string,
allocatedResource *resources.Resource) {
if !ae.eventSystem.IsEventTrackingEnabled() || !ae.limiter.Allow() {
return
}
- message := fmt.Sprintf("Predicate failed for request '%s' with message:
'%s'", ae.ask.allocationKey, predicateMsg)
- event := events.CreateRequestEventRecord(ae.ask.allocationKey,
ae.ask.applicationID, message, ae.ask.GetAllocatedResource())
+ message := fmt.Sprintf("Predicate failed for request '%s' with message:
'%s'", allocKey, predicateMsg)
+ event := events.CreateRequestEventRecord(allocKey, appID, message,
allocatedResource)
ae.eventSystem.AddEvent(event)
}
-func newAskEvents(ask *AllocationAsk, evt events.EventSystem) *askEvents {
- return newAskEventsWithRate(ask, evt, 15*time.Second, 1)
+func newAskEvents(evt events.EventSystem) *askEvents {
+ return newAskEventsWithRate(evt, 15*time.Second, 1)
}
-func newAskEventsWithRate(ask *AllocationAsk, evt events.EventSystem, interval
time.Duration, burst int) *askEvents {
+func newAskEventsWithRate(evt events.EventSystem, interval time.Duration,
burst int) *askEvents {
return &askEvents{
eventSystem: evt,
- ask: ask,
limiter: rate.NewLimiter(rate.Every(interval), burst),
}
}
diff --git a/pkg/scheduler/objects/ask_events_test.go
b/pkg/scheduler/objects/ask_events_test.go
index 75630247..40342588 100644
--- a/pkg/scheduler/objects/ask_events_test.go
+++ b/pkg/scheduler/objects/ask_events_test.go
@@ -30,29 +30,27 @@ import (
"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
)
+const allocKey = "alloc-0"
+
var requestResource =
resources.NewResourceFromMap(map[string]resources.Quantity{
"memory": 100,
"cpu": 100,
})
func TestRequestDoesNotFitInQueueEvent(t *testing.T) {
- ask := &AllocationAsk{
- allocationKey: "alloc-0",
- applicationID: "app-0",
- allocatedResource: requestResource,
- }
+ headroom :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 1})
eventSystem := mock.NewEventSystemDisabled()
- events := newAskEvents(ask, eventSystem)
-
events.sendRequestExceedsQueueHeadroom(resources.NewResourceFromMap(map[string]resources.Quantity{"first":
1}), "root.test")
+ events := newAskEvents(eventSystem)
+ events.sendRequestExceedsQueueHeadroom(allocKey, appID1, headroom,
requestResource, "root.test")
assert.Equal(t, 0, len(eventSystem.Events))
eventSystem = mock.NewEventSystem()
- events = newAskEvents(ask, eventSystem)
-
events.sendRequestExceedsQueueHeadroom(resources.NewResourceFromMap(map[string]resources.Quantity{"first":
1}), "root.test")
+ events = newAskEvents(eventSystem)
+ events.sendRequestExceedsQueueHeadroom(allocKey, appID1, headroom,
requestResource, "root.test")
assert.Equal(t, 1, len(eventSystem.Events))
event := eventSystem.Events[0]
assert.Equal(t, "alloc-0", event.ObjectID)
- assert.Equal(t, "app-0", event.ReferenceID)
+ assert.Equal(t, appID1, event.ReferenceID)
assert.Equal(t, si.EventRecord_REQUEST, event.Type)
assert.Equal(t, si.EventRecord_NONE, event.EventChangeType)
assert.Equal(t, si.EventRecord_DETAILS_NONE, event.EventChangeDetail)
@@ -60,23 +58,18 @@ func TestRequestDoesNotFitInQueueEvent(t *testing.T) {
}
func TestRequestFitsInQueueEvent(t *testing.T) {
- ask := &AllocationAsk{
- allocationKey: "alloc-0",
- applicationID: "app-0",
- allocatedResource: requestResource,
- }
eventSystem := mock.NewEventSystemDisabled()
- events := newAskEvents(ask, eventSystem)
- events.sendRequestFitsInQueue("root.test")
+ events := newAskEvents(eventSystem)
+ events.sendRequestFitsInQueue(allocKey, appID1, "root.test",
requestResource)
assert.Equal(t, 0, len(eventSystem.Events))
eventSystem = mock.NewEventSystem()
- events = newAskEvents(ask, eventSystem)
- events.sendRequestFitsInQueue("root.test")
+ events = newAskEvents(eventSystem)
+ events.sendRequestFitsInQueue(allocKey, appID1, "root.test",
requestResource)
assert.Equal(t, 1, len(eventSystem.Events))
event := eventSystem.Events[0]
assert.Equal(t, "alloc-0", event.ObjectID)
- assert.Equal(t, "app-0", event.ReferenceID)
+ assert.Equal(t, appID1, event.ReferenceID)
assert.Equal(t, si.EventRecord_REQUEST, event.Type)
assert.Equal(t, si.EventRecord_NONE, event.EventChangeType)
assert.Equal(t, si.EventRecord_DETAILS_NONE, event.EventChangeDetail)
@@ -84,23 +77,19 @@ func TestRequestFitsInQueueEvent(t *testing.T) {
}
func TestRequestExceedsUserQuotaEvent(t *testing.T) {
- ask := &AllocationAsk{
- allocationKey: "alloc-0",
- applicationID: "app-0",
- allocatedResource: requestResource,
- }
+ headroom :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 1})
eventSystem := mock.NewEventSystemDisabled()
- events := newAskEvents(ask, eventSystem)
-
events.sendRequestExceedsUserQuota(resources.NewResourceFromMap(map[string]resources.Quantity{"first":
1}))
+ events := newAskEvents(eventSystem)
+ events.sendRequestExceedsUserQuota(allocKey, appID1, headroom,
requestResource)
assert.Equal(t, 0, len(eventSystem.Events))
eventSystem = mock.NewEventSystem()
- events = newAskEvents(ask, eventSystem)
-
events.sendRequestExceedsUserQuota(resources.NewResourceFromMap(map[string]resources.Quantity{"first":
1}))
+ events = newAskEvents(eventSystem)
+ events.sendRequestExceedsUserQuota(allocKey, appID1, headroom,
requestResource)
assert.Equal(t, 1, len(eventSystem.Events))
event := eventSystem.Events[0]
assert.Equal(t, "alloc-0", event.ObjectID)
- assert.Equal(t, "app-0", event.ReferenceID)
+ assert.Equal(t, appID1, event.ReferenceID)
assert.Equal(t, si.EventRecord_REQUEST, event.Type)
assert.Equal(t, si.EventRecord_NONE, event.EventChangeType)
assert.Equal(t, si.EventRecord_DETAILS_NONE, event.EventChangeDetail)
@@ -108,23 +97,18 @@ func TestRequestExceedsUserQuotaEvent(t *testing.T) {
}
func TestRequestFitsInUserQuotaEvent(t *testing.T) {
- ask := &AllocationAsk{
- allocationKey: "alloc-0",
- applicationID: "app-0",
- allocatedResource: requestResource,
- }
eventSystem := mock.NewEventSystemDisabled()
- events := newAskEvents(ask, eventSystem)
- events.sendRequestFitsInUserQuota()
+ events := newAskEvents(eventSystem)
+ events.sendRequestFitsInUserQuota(allocKey, appID1, requestResource)
assert.Equal(t, 0, len(eventSystem.Events))
eventSystem = mock.NewEventSystem()
- events = newAskEvents(ask, eventSystem)
- events.sendRequestFitsInUserQuota()
+ events = newAskEvents(eventSystem)
+ events.sendRequestFitsInUserQuota(allocKey, appID1, requestResource)
assert.Equal(t, 1, len(eventSystem.Events))
event := eventSystem.Events[0]
assert.Equal(t, "alloc-0", event.ObjectID)
- assert.Equal(t, "app-0", event.ReferenceID)
+ assert.Equal(t, appID1, event.ReferenceID)
assert.Equal(t, si.EventRecord_REQUEST, event.Type)
assert.Equal(t, si.EventRecord_NONE, event.EventChangeType)
assert.Equal(t, si.EventRecord_DETAILS_NONE, event.EventChangeDetail)
@@ -132,21 +116,17 @@ func TestRequestFitsInUserQuotaEvent(t *testing.T) {
}
func TestPredicateFailedEvents(t *testing.T) {
- ask := &AllocationAsk{
- allocationKey: "alloc-0",
- applicationID: "app-0",
- allocatedResource: requestResource,
- }
+ resource :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 1})
eventSystem := mock.NewEventSystemDisabled()
- events := newAskEvents(ask, eventSystem)
- events.sendPredicateFailed("failed")
+ events := newAskEvents(eventSystem)
+ events.sendPredicateFailed("alloc-0", "app-0", "failed", resource)
assert.Equal(t, 0, len(eventSystem.Events))
eventSystem = mock.NewEventSystem()
- events = newAskEventsWithRate(ask, eventSystem, 50*time.Millisecond, 1)
+ events = newAskEventsWithRate(eventSystem, 50*time.Millisecond, 1)
// only the first event is expected to be emitted due to rate limiting
for i := 0; i < 200; i++ {
- events.sendPredicateFailed("failure-" +
strconv.FormatUint(uint64(i), 10))
+ events.sendPredicateFailed("alloc-0", "app-0",
"failure-"+strconv.FormatUint(uint64(i), 10), resource)
}
assert.Equal(t, 1, len(eventSystem.Events))
event := eventSystem.Events[0]
@@ -155,7 +135,7 @@ func TestPredicateFailedEvents(t *testing.T) {
eventSystem.Reset()
// wait a bit, a new event is expected
time.Sleep(100 * time.Millisecond)
- events.sendPredicateFailed("failed")
+ events.sendPredicateFailed("alloc-0", "app-0", "failed", resource)
assert.Equal(t, 1, len(eventSystem.Events))
event = eventSystem.Events[0]
assert.Equal(t, "Predicate failed for request 'alloc-0' with message:
'failed'", event.Message)
diff --git a/pkg/scheduler/objects/node_test.go
b/pkg/scheduler/objects/node_test.go
index dccb2790..bc17cba6 100644
--- a/pkg/scheduler/objects/node_test.go
+++ b/pkg/scheduler/objects/node_test.go
@@ -785,7 +785,7 @@ func TestPreconditions(t *testing.T) {
res :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 1})
ask := newAllocationAsk("test", "app001", res)
eventSystem := evtMock.NewEventSystem()
- ask.askEvents = newAskEvents(ask, eventSystem)
+ ask.askEvents = newAskEvents(eventSystem)
node := NewNode(proto)
// failure
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]