This is an automated email from the ASF dual-hosted git repository.

pbacsko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git


The following commit(s) were added to refs/heads/master by this push:
     new f7d0e102 [YUNIKORN-2982] Send event when preemption occurs (#1000)
f7d0e102 is described below

commit f7d0e102a8423baa43ef0744d803e45b64cca82c
Author: haorenhui <[email protected]>
AuthorDate: Tue Dec 3 11:02:54 2024 +0100

    [YUNIKORN-2982] Send event when preemption occurs (#1000)
    
    Closes: #1000
    
    Signed-off-by: Peter Bacsko <[email protected]>
---
 pkg/scheduler/objects/allocation.go             |  5 +++
 pkg/scheduler/objects/events/ask_events.go      |  9 +++++
 pkg/scheduler/objects/events/ask_events_test.go | 20 +++++++++
 pkg/scheduler/objects/preemption.go             |  1 +
 pkg/scheduler/objects/preemption_test.go        | 54 +++++++++++++++++++++++++
 5 files changed, 89 insertions(+)

diff --git a/pkg/scheduler/objects/allocation.go 
b/pkg/scheduler/objects/allocation.go
index 17fd531c..a4701cd6 100644
--- a/pkg/scheduler/objects/allocation.go
+++ b/pkg/scheduler/objects/allocation.go
@@ -480,6 +480,11 @@ func (a *Allocation) 
SendRequiredNodePreemptionFailedEvent(node string) {
        a.askEvents.SendRequiredNodePreemptionFailed(a.allocationKey, 
a.applicationID, node, a.GetAllocatedResource())
 }
 
+// SendPreemptedBySchedulerEvent updates the event system with the preemption 
event.
+func (a *Allocation) SendPreemptedBySchedulerEvent(preemptorAllocKey, 
preemptorAppId, preemptorQueuePath string) {
+       a.askEvents.SendPreemptedByScheduler(a.allocationKey, a.applicationID, 
preemptorAllocKey, preemptorAppId, preemptorQueuePath, a.GetAllocatedResource())
+}
+
 // GetAllocationLog returns a list of log entries corresponding to allocation 
preconditions not being met.
 func (a *Allocation) GetAllocationLog() []*AllocationLogEntry {
        a.RLock()
diff --git a/pkg/scheduler/objects/events/ask_events.go 
b/pkg/scheduler/objects/events/ask_events.go
index 0e10a253..db657339 100644
--- a/pkg/scheduler/objects/events/ask_events.go
+++ b/pkg/scheduler/objects/events/ask_events.go
@@ -105,6 +105,15 @@ func (ae *AskEvents) 
SendRequiredNodePreemptionFailed(allocKey, appID, node stri
        ae.eventSystem.AddEvent(event)
 }
 
+func (ae *AskEvents) SendPreemptedByScheduler(allocKey, appID, 
preemptorAllocKey, preemptorAppId, preemptorQueuePath string, allocatedResource 
*resources.Resource) {
+       if !ae.eventSystem.IsEventTrackingEnabled() {
+               return
+       }
+       message := fmt.Sprintf("Preempted by %s from application %s in %s", 
preemptorAllocKey, preemptorAppId, preemptorQueuePath)
+       event := events.CreateRequestEventRecord(allocKey, appID, message, 
allocatedResource)
+       ae.eventSystem.AddEvent(event)
+}
+
 func NewAskEvents(evt events.EventSystem) *AskEvents {
        return newAskEventsWithRate(evt, 15*time.Second, 1)
 }
diff --git a/pkg/scheduler/objects/events/ask_events_test.go 
b/pkg/scheduler/objects/events/ask_events_test.go
index 8f6f8558..d6680600 100644
--- a/pkg/scheduler/objects/events/ask_events_test.go
+++ b/pkg/scheduler/objects/events/ask_events_test.go
@@ -178,3 +178,23 @@ func TestRequiredNodePreemptionFailedEvents(t *testing.T) {
        event = eventSystem.Events[0]
        assert.Equal(t, "Unschedulable request 'alloc-1' with required node 
'node-1', no preemption victim found", event.Message)
 }
+
+func TestPreemptedBySchedulerEvents(t *testing.T) {
+       resource := 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 1})
+       eventSystem := mock.NewEventSystemDisabled()
+       events := NewAskEvents(eventSystem)
+       events.SendPreemptedByScheduler("alloc-0", appID, "preemptor-0", 
"preemptor-app-0", "root.parent.child1", resource)
+       assert.Equal(t, 0, len(eventSystem.Events))
+
+       eventSystem = mock.NewEventSystem()
+       events = NewAskEvents(eventSystem)
+       events.SendPreemptedByScheduler("alloc-0", appID, "preemptor-0", 
"preemptor-app-0", "root.parent.child1", resource)
+       assert.Equal(t, 1, len(eventSystem.Events))
+       event := eventSystem.Events[0]
+       assert.Equal(t, "alloc-0", event.ObjectID)
+       assert.Equal(t, appID, event.ReferenceID)
+       assert.Equal(t, si.EventRecord_REQUEST, event.Type)
+       assert.Equal(t, si.EventRecord_NONE, event.EventChangeType)
+       assert.Equal(t, si.EventRecord_DETAILS_NONE, event.EventChangeDetail)
+       assert.Equal(t, "Preempted by preemptor-0 from application 
preemptor-app-0 in root.parent.child1", event.Message)
+}
diff --git a/pkg/scheduler/objects/preemption.go 
b/pkg/scheduler/objects/preemption.go
index 441b12dd..10cacea2 100644
--- a/pkg/scheduler/objects/preemption.go
+++ b/pkg/scheduler/objects/preemption.go
@@ -635,6 +635,7 @@ func (p *Preemptor) TryPreemption() (*AllocationResult, 
bool) {
                                zap.String("victimNodeID", victim.GetNodeID()),
                                zap.String("victimQueue", victimQueue.Name),
                        )
+                       
victim.SendPreemptedBySchedulerEvent(p.ask.allocationKey, p.ask.applicationID, 
p.application.queuePath)
                } else {
                        log.Log(log.SchedPreemption).Warn("BUG: Queue not found 
for preemption victim",
                                zap.String("queue", p.queue.Name),
diff --git a/pkg/scheduler/objects/preemption_test.go 
b/pkg/scheduler/objects/preemption_test.go
index b2fce675..e845da92 100644
--- a/pkg/scheduler/objects/preemption_test.go
+++ b/pkg/scheduler/objects/preemption_test.go
@@ -28,8 +28,10 @@ import (
 
        "github.com/apache/yunikorn-core/pkg/common"
        "github.com/apache/yunikorn-core/pkg/common/resources"
+       evtMock "github.com/apache/yunikorn-core/pkg/events/mock"
        "github.com/apache/yunikorn-core/pkg/mock"
        "github.com/apache/yunikorn-core/pkg/plugins"
+       schedEvt "github.com/apache/yunikorn-core/pkg/scheduler/objects/events"
        "github.com/apache/yunikorn-scheduler-interface/lib/go/si"
 )
 
@@ -293,6 +295,58 @@ func TestTryPreemption(t *testing.T) {
        assert.Equal(t, len(ask3.GetAllocationLog()), 0)
 }
 
+func TestTryPreemption_SendEvent(t *testing.T) {
+       node := newNode(nodeID1, map[string]resources.Quantity{"first": 10, 
"pods": 5})
+       iterator := getNodeIteratorFn(node)
+       rootQ, err := createRootQueue(map[string]string{"first": "20", "pods": 
"5"})
+       assert.NilError(t, err)
+       parentQ, err := createManagedQueueGuaranteed(rootQ, "parent", true, 
map[string]string{"first": "20"}, map[string]string{"first": "10"})
+       assert.NilError(t, err)
+       childQ1, err := createManagedQueueGuaranteed(parentQ, "child1", false, 
map[string]string{"first": "10"}, map[string]string{"first": "5"})
+       assert.NilError(t, err)
+       childQ2, err := createManagedQueueGuaranteed(parentQ, "child2", false, 
map[string]string{"first": "10"}, map[string]string{"first": "5"})
+       assert.NilError(t, err)
+
+       alloc1, alloc2, err := creatApp1(childQ1, node, nil, 
map[string]resources.Quantity{"first": 5, "pods": 1})
+       assert.NilError(t, err)
+
+       eventSystem := evtMock.NewEventSystem()
+       events := schedEvt.NewAskEvents(eventSystem)
+       alloc1.askEvents = events
+
+       app2, ask3, err := creatApp2(childQ2, 
map[string]resources.Quantity{"first": 5, "pods": 1}, "alloc3")
+       assert.NilError(t, err)
+       childQ2.incPendingResource(ask3.GetAllocatedResource())
+
+       headRoom := 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10, "pods": 
3})
+       preemptor := NewPreemptor(app2, headRoom, 30*time.Second, ask3, 
iterator(), false)
+
+       // register predicate handler
+       preemptions := []mock.Preemption{
+               mock.NewPreemption(true, "alloc3", nodeID1, []string{"alloc1"}, 
0, 0),
+       }
+       plugin := mock.NewPreemptionPredicatePlugin(nil, nil, preemptions)
+       plugins.RegisterSchedulerPlugin(plugin)
+       defer plugins.UnregisterSchedulerPlugins()
+
+       result, ok := preemptor.TryPreemption()
+       assert.Assert(t, result != nil, "no result")
+       assert.NilError(t, plugin.GetPredicateError())
+       assert.Assert(t, ok, "no victims found")
+       assert.Equal(t, "alloc3", result.Request.GetAllocationKey(), "wrong 
alloc")
+       assert.Check(t, alloc1.IsPreempted(), "alloc1 not preempted")
+       assert.Check(t, !alloc2.IsPreempted(), "alloc2 preempted")
+       assert.Equal(t, 1, len(eventSystem.Events))
+       event := eventSystem.Events[0]
+       assert.Equal(t, alloc1.applicationID, event.ReferenceID)
+       assert.Equal(t, alloc1.allocationKey, event.ObjectID)
+       assert.Equal(t, si.EventRecord_NONE, event.EventChangeType)
+       assert.Equal(t, si.EventRecord_DETAILS_NONE, event.EventChangeDetail)
+       assert.Equal(t, si.EventRecord_REQUEST, event.Type)
+       assert.Equal(t, fmt.Sprintf("Preempted by %s from application %s in 
%s", "alloc3", appID2, "root.parent.child2"), event.Message)
+       assert.Equal(t, len(ask3.GetAllocationLog()), 0)
+}
+
 // TestTryPreemptionOnNode Test try preemption on node with simple queue 
hierarchy. Since Node doesn't have enough resources to accomodate, preemption 
happens because of node resource constraint.
 // Guaranteed and Max resource set on both victim queue path and preemptor 
queue path in 2 levels. victim and preemptor queue are siblings.
 // Request (Preemptor) resource type matches with all resource types of the 
victim. But Guaranteed set only on specific resource type. 2 Victims are 
available, but 1 should be preempted because further preemption would make 
usage go below the guaranteed quota


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to