This is an automated email from the ASF dual-hosted git repository.

pbacsko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git


The following commit(s) were added to refs/heads/master by this push:
     new b0721e24 [YUNIKORN-2566] Remove AllocationAsk reference from askEvents 
(#868)
b0721e24 is described below

commit b0721e242b4936c4801aea3e885f8e796d5f4f89
Author: Peter Bacsko <[email protected]>
AuthorDate: Thu May 23 12:31:16 2024 +0200

    [YUNIKORN-2566] Remove AllocationAsk reference from askEvents (#868)
    
    Closes: #868
    
    Signed-off-by: Peter Bacsko <[email protected]>
---
 pkg/scheduler/objects/allocation_ask.go      | 14 ++---
 pkg/scheduler/objects/allocation_ask_test.go |  4 +-
 pkg/scheduler/objects/application_test.go    |  4 +-
 pkg/scheduler/objects/ask_events.go          | 38 +++++++------
 pkg/scheduler/objects/ask_events_test.go     | 80 +++++++++++-----------------
 pkg/scheduler/objects/node_test.go           |  2 +-
 6 files changed, 60 insertions(+), 82 deletions(-)

diff --git a/pkg/scheduler/objects/allocation_ask.go 
b/pkg/scheduler/objects/allocation_ask.go
index 4701beae..e213a1b6 100644
--- a/pkg/scheduler/objects/allocation_ask.go
+++ b/pkg/scheduler/objects/allocation_ask.go
@@ -77,9 +77,9 @@ func NewAllocationAsk(allocationKey string, applicationID 
string, allocatedResou
                allocatedResource: allocatedResource,
                allocLog:          make(map[string]*AllocationLogEntry),
                resKeyPerNode:     make(map[string]string),
+               askEvents:         newAskEvents(events.GetEventSystem()),
        }
        aa.resKeyWithoutNode = reservationKeyWithoutNode(applicationID, 
allocationKey)
-       aa.askEvents = newAskEvents(aa, events.GetEventSystem())
        return aa
 }
 
@@ -99,6 +99,7 @@ func NewAllocationAskFromSI(ask *si.AllocationAsk) 
*AllocationAsk {
                originator:        ask.Originator,
                allocLog:          make(map[string]*AllocationLogEntry),
                resKeyPerNode:     make(map[string]string),
+               askEvents:         newAskEvents(events.GetEventSystem()),
        }
        // this is a safety check placeholder and task group name must be set 
as a combo
        // order is important as task group can be set without placeholder but 
not the other way around
@@ -108,7 +109,6 @@ func NewAllocationAskFromSI(ask *si.AllocationAsk) 
*AllocationAsk {
                return nil
        }
        saa.resKeyWithoutNode = reservationKeyWithoutNode(ask.ApplicationID, 
ask.AllocationKey)
-       saa.askEvents = newAskEvents(saa, events.GetEventSystem())
        return saa
 }
 
@@ -260,7 +260,7 @@ func (aa *AllocationAsk) LogAllocationFailure(message 
string, allocate bool) {
 }
 
 func (aa *AllocationAsk) SendPredicateFailedEvent(message string) {
-       aa.askEvents.sendPredicateFailed(message)
+       aa.askEvents.sendPredicateFailed(aa.allocationKey, aa.applicationID, 
message, aa.GetAllocatedResource())
 }
 
 // GetAllocationLog returns a list of log entries corresponding to allocation 
preconditions not being met
@@ -344,7 +344,7 @@ func (aa *AllocationAsk) setHeadroomCheckFailed(headroom 
*resources.Resource, qu
        defer aa.Unlock()
        if !aa.headroomCheckFailed {
                aa.headroomCheckFailed = true
-               aa.askEvents.sendRequestExceedsQueueHeadroom(headroom, queue)
+               aa.askEvents.sendRequestExceedsQueueHeadroom(aa.allocationKey, 
aa.applicationID, headroom, aa.allocatedResource, queue)
        }
 }
 
@@ -353,7 +353,7 @@ func (aa *AllocationAsk) setHeadroomCheckPassed(queue 
string) {
        defer aa.Unlock()
        if aa.headroomCheckFailed {
                aa.headroomCheckFailed = false
-               aa.askEvents.sendRequestFitsInQueue(queue)
+               aa.askEvents.sendRequestFitsInQueue(aa.allocationKey, 
aa.applicationID, queue, aa.allocatedResource)
        }
 }
 
@@ -362,7 +362,7 @@ func (aa *AllocationAsk) setUserQuotaCheckFailed(available 
*resources.Resource)
        defer aa.Unlock()
        if !aa.userQuotaCheckFailed {
                aa.userQuotaCheckFailed = true
-               aa.askEvents.sendRequestExceedsUserQuota(available)
+               aa.askEvents.sendRequestExceedsUserQuota(aa.allocationKey, 
aa.applicationID, available, aa.allocatedResource)
        }
 }
 
@@ -371,6 +371,6 @@ func (aa *AllocationAsk) setUserQuotaCheckPassed() {
        defer aa.Unlock()
        if aa.userQuotaCheckFailed {
                aa.userQuotaCheckFailed = false
-               aa.askEvents.sendRequestFitsInUserQuota()
+               aa.askEvents.sendRequestFitsInUserQuota(aa.allocationKey, 
aa.applicationID, aa.allocatedResource)
        }
 }
diff --git a/pkg/scheduler/objects/allocation_ask_test.go 
b/pkg/scheduler/objects/allocation_ask_test.go
index b2667c7d..a9739754 100644
--- a/pkg/scheduler/objects/allocation_ask_test.go
+++ b/pkg/scheduler/objects/allocation_ask_test.go
@@ -213,12 +213,12 @@ func TestSendPredicateFailed(t *testing.T) {
        }
        ask := NewAllocationAskFromSI(siAsk)
        eventSystem := mock.NewEventSystemDisabled()
-       ask.askEvents = newAskEvents(ask, eventSystem)
+       ask.askEvents = newAskEvents(eventSystem)
        ask.SendPredicateFailedEvent("failed")
        assert.Equal(t, 0, len(eventSystem.Events))
 
        eventSystem = mock.NewEventSystem()
-       ask.askEvents = newAskEvents(ask, eventSystem)
+       ask.askEvents = newAskEvents(eventSystem)
        ask.SendPredicateFailedEvent("failure")
        assert.Equal(t, 1, len(eventSystem.Events))
        event := eventSystem.Events[0]
diff --git a/pkg/scheduler/objects/application_test.go 
b/pkg/scheduler/objects/application_test.go
index eab5ae5b..82939359 100644
--- a/pkg/scheduler/objects/application_test.go
+++ b/pkg/scheduler/objects/application_test.go
@@ -2247,7 +2247,7 @@ func TestRequestDoesNotFitQueueEvents(t *testing.T) {
        ask := newAllocationAsk("alloc-0", "app-1", res)
        app := newApplication(appID1, "default", "root.default")
        eventSystem := mock.NewEventSystem()
-       ask.askEvents = newAskEvents(ask, eventSystem)
+       ask.askEvents = newAskEvents(eventSystem)
        app.disableStateChangeEvents()
        app.resetAppEvents()
        queue, err := createRootQueue(nil)
@@ -2318,7 +2318,7 @@ func TestRequestDoesNotFitUserQuotaQueueEvents(t 
*testing.T) {
        ask := newAllocationAsk("alloc-0", "app-1", res)
        app := newApplication(appID1, "default", "root")
        eventSystem := mock.NewEventSystem()
-       ask.askEvents = newAskEvents(ask, eventSystem)
+       ask.askEvents = newAskEvents(eventSystem)
        app.disableStateChangeEvents()
        app.resetAppEvents()
        queue, err := createRootQueue(nil)
diff --git a/pkg/scheduler/objects/ask_events.go 
b/pkg/scheduler/objects/ask_events.go
index 3994ef33..755374c6 100644
--- a/pkg/scheduler/objects/ask_events.go
+++ b/pkg/scheduler/objects/ask_events.go
@@ -31,63 +31,61 @@ import (
 // Ask-specific events. These events are of REQUEST type, so they are 
eventually sent to the respective pods in K8s.
 type askEvents struct {
        eventSystem events.EventSystem
-       ask         *AllocationAsk
        limiter     *rate.Limiter
 }
 
-func (ae *askEvents) sendRequestExceedsQueueHeadroom(headroom 
*resources.Resource, queuePath string) {
+func (ae *askEvents) sendRequestExceedsQueueHeadroom(allocKey, appID string, 
headroom, allocatedResource *resources.Resource, queuePath string) {
        if !ae.eventSystem.IsEventTrackingEnabled() {
                return
        }
-       message := fmt.Sprintf("Request '%s' does not fit in queue '%s' 
(requested %s, available %s)", ae.ask.allocationKey, queuePath, 
ae.ask.GetAllocatedResource(), headroom)
-       event := events.CreateRequestEventRecord(ae.ask.allocationKey, 
ae.ask.applicationID, message, ae.ask.GetAllocatedResource())
+       message := fmt.Sprintf("Request '%s' does not fit in queue '%s' 
(requested %s, available %s)", allocKey, queuePath, allocatedResource, headroom)
+       event := events.CreateRequestEventRecord(allocKey, appID, message, 
allocatedResource)
        ae.eventSystem.AddEvent(event)
 }
 
-func (ae *askEvents) sendRequestFitsInQueue(queuePath string) {
+func (ae *askEvents) sendRequestFitsInQueue(allocKey, appID, queuePath string, 
allocatedResource *resources.Resource) {
        if !ae.eventSystem.IsEventTrackingEnabled() {
                return
        }
-       message := fmt.Sprintf("Request '%s' has become schedulable in queue 
'%s'", ae.ask.allocationKey, queuePath)
-       event := events.CreateRequestEventRecord(ae.ask.allocationKey, 
ae.ask.applicationID, message, ae.ask.GetAllocatedResource())
+       message := fmt.Sprintf("Request '%s' has become schedulable in queue 
'%s'", allocKey, queuePath)
+       event := events.CreateRequestEventRecord(allocKey, appID, message, 
allocatedResource)
        ae.eventSystem.AddEvent(event)
 }
 
-func (ae *askEvents) sendRequestExceedsUserQuota(headroom *resources.Resource) 
{
+func (ae *askEvents) sendRequestExceedsUserQuota(allocKey, appID string, 
headroom, allocatedResource *resources.Resource) {
        if !ae.eventSystem.IsEventTrackingEnabled() {
                return
        }
-       message := fmt.Sprintf("Request '%s' exceeds the available user quota 
(requested %s, available %s)", ae.ask.allocationKey, 
ae.ask.GetAllocatedResource(), headroom)
-       event := events.CreateRequestEventRecord(ae.ask.allocationKey, 
ae.ask.applicationID, message, ae.ask.GetAllocatedResource())
+       message := fmt.Sprintf("Request '%s' exceeds the available user quota 
(requested %s, available %s)", allocKey, allocatedResource, headroom)
+       event := events.CreateRequestEventRecord(allocKey, appID, message, 
allocatedResource)
        ae.eventSystem.AddEvent(event)
 }
 
-func (ae *askEvents) sendRequestFitsInUserQuota() {
+func (ae *askEvents) sendRequestFitsInUserQuota(allocKey, appID string, 
allocatedResource *resources.Resource) {
        if !ae.eventSystem.IsEventTrackingEnabled() {
                return
        }
-       message := fmt.Sprintf("Request '%s' fits in the available user quota", 
ae.ask.allocationKey)
-       event := events.CreateRequestEventRecord(ae.ask.allocationKey, 
ae.ask.applicationID, message, ae.ask.GetAllocatedResource())
+       message := fmt.Sprintf("Request '%s' fits in the available user quota", 
allocKey)
+       event := events.CreateRequestEventRecord(allocKey, appID, message, 
allocatedResource)
        ae.eventSystem.AddEvent(event)
 }
 
-func (ae *askEvents) sendPredicateFailed(predicateMsg string) {
+func (ae *askEvents) sendPredicateFailed(allocKey, appID, predicateMsg string, 
allocatedResource *resources.Resource) {
        if !ae.eventSystem.IsEventTrackingEnabled() || !ae.limiter.Allow() {
                return
        }
-       message := fmt.Sprintf("Predicate failed for request '%s' with message: 
'%s'", ae.ask.allocationKey, predicateMsg)
-       event := events.CreateRequestEventRecord(ae.ask.allocationKey, 
ae.ask.applicationID, message, ae.ask.GetAllocatedResource())
+       message := fmt.Sprintf("Predicate failed for request '%s' with message: 
'%s'", allocKey, predicateMsg)
+       event := events.CreateRequestEventRecord(allocKey, appID, message, 
allocatedResource)
        ae.eventSystem.AddEvent(event)
 }
 
-func newAskEvents(ask *AllocationAsk, evt events.EventSystem) *askEvents {
-       return newAskEventsWithRate(ask, evt, 15*time.Second, 1)
+func newAskEvents(evt events.EventSystem) *askEvents {
+       return newAskEventsWithRate(evt, 15*time.Second, 1)
 }
 
-func newAskEventsWithRate(ask *AllocationAsk, evt events.EventSystem, interval 
time.Duration, burst int) *askEvents {
+func newAskEventsWithRate(evt events.EventSystem, interval time.Duration, 
burst int) *askEvents {
        return &askEvents{
                eventSystem: evt,
-               ask:         ask,
                limiter:     rate.NewLimiter(rate.Every(interval), burst),
        }
 }
diff --git a/pkg/scheduler/objects/ask_events_test.go 
b/pkg/scheduler/objects/ask_events_test.go
index 75630247..40342588 100644
--- a/pkg/scheduler/objects/ask_events_test.go
+++ b/pkg/scheduler/objects/ask_events_test.go
@@ -30,29 +30,27 @@ import (
        "github.com/apache/yunikorn-scheduler-interface/lib/go/si"
 )
 
+const allocKey = "alloc-0"
+
 var requestResource = 
resources.NewResourceFromMap(map[string]resources.Quantity{
        "memory": 100,
        "cpu":    100,
 })
 
 func TestRequestDoesNotFitInQueueEvent(t *testing.T) {
-       ask := &AllocationAsk{
-               allocationKey:     "alloc-0",
-               applicationID:     "app-0",
-               allocatedResource: requestResource,
-       }
+       headroom := 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 1})
        eventSystem := mock.NewEventSystemDisabled()
-       events := newAskEvents(ask, eventSystem)
-       
events.sendRequestExceedsQueueHeadroom(resources.NewResourceFromMap(map[string]resources.Quantity{"first":
 1}), "root.test")
+       events := newAskEvents(eventSystem)
+       events.sendRequestExceedsQueueHeadroom(allocKey, appID1, headroom, 
requestResource, "root.test")
        assert.Equal(t, 0, len(eventSystem.Events))
 
        eventSystem = mock.NewEventSystem()
-       events = newAskEvents(ask, eventSystem)
-       
events.sendRequestExceedsQueueHeadroom(resources.NewResourceFromMap(map[string]resources.Quantity{"first":
 1}), "root.test")
+       events = newAskEvents(eventSystem)
+       events.sendRequestExceedsQueueHeadroom(allocKey, appID1, headroom, 
requestResource, "root.test")
        assert.Equal(t, 1, len(eventSystem.Events))
        event := eventSystem.Events[0]
        assert.Equal(t, "alloc-0", event.ObjectID)
-       assert.Equal(t, "app-0", event.ReferenceID)
+       assert.Equal(t, appID1, event.ReferenceID)
        assert.Equal(t, si.EventRecord_REQUEST, event.Type)
        assert.Equal(t, si.EventRecord_NONE, event.EventChangeType)
        assert.Equal(t, si.EventRecord_DETAILS_NONE, event.EventChangeDetail)
@@ -60,23 +58,18 @@ func TestRequestDoesNotFitInQueueEvent(t *testing.T) {
 }
 
 func TestRequestFitsInQueueEvent(t *testing.T) {
-       ask := &AllocationAsk{
-               allocationKey:     "alloc-0",
-               applicationID:     "app-0",
-               allocatedResource: requestResource,
-       }
        eventSystem := mock.NewEventSystemDisabled()
-       events := newAskEvents(ask, eventSystem)
-       events.sendRequestFitsInQueue("root.test")
+       events := newAskEvents(eventSystem)
+       events.sendRequestFitsInQueue(allocKey, appID1, "root.test", 
requestResource)
        assert.Equal(t, 0, len(eventSystem.Events))
 
        eventSystem = mock.NewEventSystem()
-       events = newAskEvents(ask, eventSystem)
-       events.sendRequestFitsInQueue("root.test")
+       events = newAskEvents(eventSystem)
+       events.sendRequestFitsInQueue(allocKey, appID1, "root.test", 
requestResource)
        assert.Equal(t, 1, len(eventSystem.Events))
        event := eventSystem.Events[0]
        assert.Equal(t, "alloc-0", event.ObjectID)
-       assert.Equal(t, "app-0", event.ReferenceID)
+       assert.Equal(t, appID1, event.ReferenceID)
        assert.Equal(t, si.EventRecord_REQUEST, event.Type)
        assert.Equal(t, si.EventRecord_NONE, event.EventChangeType)
        assert.Equal(t, si.EventRecord_DETAILS_NONE, event.EventChangeDetail)
@@ -84,23 +77,19 @@ func TestRequestFitsInQueueEvent(t *testing.T) {
 }
 
 func TestRequestExceedsUserQuotaEvent(t *testing.T) {
-       ask := &AllocationAsk{
-               allocationKey:     "alloc-0",
-               applicationID:     "app-0",
-               allocatedResource: requestResource,
-       }
+       headroom := 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 1})
        eventSystem := mock.NewEventSystemDisabled()
-       events := newAskEvents(ask, eventSystem)
-       
events.sendRequestExceedsUserQuota(resources.NewResourceFromMap(map[string]resources.Quantity{"first":
 1}))
+       events := newAskEvents(eventSystem)
+       events.sendRequestExceedsUserQuota(allocKey, appID1, headroom, 
requestResource)
        assert.Equal(t, 0, len(eventSystem.Events))
 
        eventSystem = mock.NewEventSystem()
-       events = newAskEvents(ask, eventSystem)
-       
events.sendRequestExceedsUserQuota(resources.NewResourceFromMap(map[string]resources.Quantity{"first":
 1}))
+       events = newAskEvents(eventSystem)
+       events.sendRequestExceedsUserQuota(allocKey, appID1, headroom, 
requestResource)
        assert.Equal(t, 1, len(eventSystem.Events))
        event := eventSystem.Events[0]
        assert.Equal(t, "alloc-0", event.ObjectID)
-       assert.Equal(t, "app-0", event.ReferenceID)
+       assert.Equal(t, appID1, event.ReferenceID)
        assert.Equal(t, si.EventRecord_REQUEST, event.Type)
        assert.Equal(t, si.EventRecord_NONE, event.EventChangeType)
        assert.Equal(t, si.EventRecord_DETAILS_NONE, event.EventChangeDetail)
@@ -108,23 +97,18 @@ func TestRequestExceedsUserQuotaEvent(t *testing.T) {
 }
 
 func TestRequestFitsInUserQuotaEvent(t *testing.T) {
-       ask := &AllocationAsk{
-               allocationKey:     "alloc-0",
-               applicationID:     "app-0",
-               allocatedResource: requestResource,
-       }
        eventSystem := mock.NewEventSystemDisabled()
-       events := newAskEvents(ask, eventSystem)
-       events.sendRequestFitsInUserQuota()
+       events := newAskEvents(eventSystem)
+       events.sendRequestFitsInUserQuota(allocKey, appID1, requestResource)
        assert.Equal(t, 0, len(eventSystem.Events))
 
        eventSystem = mock.NewEventSystem()
-       events = newAskEvents(ask, eventSystem)
-       events.sendRequestFitsInUserQuota()
+       events = newAskEvents(eventSystem)
+       events.sendRequestFitsInUserQuota(allocKey, appID1, requestResource)
        assert.Equal(t, 1, len(eventSystem.Events))
        event := eventSystem.Events[0]
        assert.Equal(t, "alloc-0", event.ObjectID)
-       assert.Equal(t, "app-0", event.ReferenceID)
+       assert.Equal(t, appID1, event.ReferenceID)
        assert.Equal(t, si.EventRecord_REQUEST, event.Type)
        assert.Equal(t, si.EventRecord_NONE, event.EventChangeType)
        assert.Equal(t, si.EventRecord_DETAILS_NONE, event.EventChangeDetail)
@@ -132,21 +116,17 @@ func TestRequestFitsInUserQuotaEvent(t *testing.T) {
 }
 
 func TestPredicateFailedEvents(t *testing.T) {
-       ask := &AllocationAsk{
-               allocationKey:     "alloc-0",
-               applicationID:     "app-0",
-               allocatedResource: requestResource,
-       }
+       resource := 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 1})
        eventSystem := mock.NewEventSystemDisabled()
-       events := newAskEvents(ask, eventSystem)
-       events.sendPredicateFailed("failed")
+       events := newAskEvents(eventSystem)
+       events.sendPredicateFailed("alloc-0", "app-0", "failed", resource)
        assert.Equal(t, 0, len(eventSystem.Events))
 
        eventSystem = mock.NewEventSystem()
-       events = newAskEventsWithRate(ask, eventSystem, 50*time.Millisecond, 1)
+       events = newAskEventsWithRate(eventSystem, 50*time.Millisecond, 1)
        // only the first event is expected to be emitted due to rate limiting
        for i := 0; i < 200; i++ {
-               events.sendPredicateFailed("failure-" + 
strconv.FormatUint(uint64(i), 10))
+               events.sendPredicateFailed("alloc-0", "app-0", 
"failure-"+strconv.FormatUint(uint64(i), 10), resource)
        }
        assert.Equal(t, 1, len(eventSystem.Events))
        event := eventSystem.Events[0]
@@ -155,7 +135,7 @@ func TestPredicateFailedEvents(t *testing.T) {
        eventSystem.Reset()
        // wait a bit, a new event is expected
        time.Sleep(100 * time.Millisecond)
-       events.sendPredicateFailed("failed")
+       events.sendPredicateFailed("alloc-0", "app-0", "failed", resource)
        assert.Equal(t, 1, len(eventSystem.Events))
        event = eventSystem.Events[0]
        assert.Equal(t, "Predicate failed for request 'alloc-0' with message: 
'failed'", event.Message)
diff --git a/pkg/scheduler/objects/node_test.go 
b/pkg/scheduler/objects/node_test.go
index dccb2790..bc17cba6 100644
--- a/pkg/scheduler/objects/node_test.go
+++ b/pkg/scheduler/objects/node_test.go
@@ -785,7 +785,7 @@ func TestPreconditions(t *testing.T) {
        res := 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 1})
        ask := newAllocationAsk("test", "app001", res)
        eventSystem := evtMock.NewEventSystem()
-       ask.askEvents = newAskEvents(ask, eventSystem)
+       ask.askEvents = newAskEvents(eventSystem)
        node := NewNode(proto)
 
        // failure


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to