This is an automated email from the ASF dual-hosted git repository.
pbacsko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git
The following commit(s) were added to refs/heads/master by this push:
new f7d0e102 [YUNIKORN-2982] Send event when preemption occurs (#1000)
f7d0e102 is described below
commit f7d0e102a8423baa43ef0744d803e45b64cca82c
Author: haorenhui <[email protected]>
AuthorDate: Tue Dec 3 11:02:54 2024 +0100
[YUNIKORN-2982] Send event when preemption occurs (#1000)
Closes: #1000
Signed-off-by: Peter Bacsko <[email protected]>
---
pkg/scheduler/objects/allocation.go | 5 +++
pkg/scheduler/objects/events/ask_events.go | 9 +++++
pkg/scheduler/objects/events/ask_events_test.go | 20 +++++++++
pkg/scheduler/objects/preemption.go | 1 +
pkg/scheduler/objects/preemption_test.go | 54 +++++++++++++++++++++++++
5 files changed, 89 insertions(+)
diff --git a/pkg/scheduler/objects/allocation.go
b/pkg/scheduler/objects/allocation.go
index 17fd531c..a4701cd6 100644
--- a/pkg/scheduler/objects/allocation.go
+++ b/pkg/scheduler/objects/allocation.go
@@ -480,6 +480,11 @@ func (a *Allocation)
SendRequiredNodePreemptionFailedEvent(node string) {
a.askEvents.SendRequiredNodePreemptionFailed(a.allocationKey,
a.applicationID, node, a.GetAllocatedResource())
}
+// SendPreemptedBySchedulerEvent updates the event system with the preemption
event.
+func (a *Allocation) SendPreemptedBySchedulerEvent(preemptorAllocKey,
preemptorAppId, preemptorQueuePath string) {
+ a.askEvents.SendPreemptedByScheduler(a.allocationKey, a.applicationID,
preemptorAllocKey, preemptorAppId, preemptorQueuePath, a.GetAllocatedResource())
+}
+
// GetAllocationLog returns a list of log entries corresponding to allocation
preconditions not being met.
func (a *Allocation) GetAllocationLog() []*AllocationLogEntry {
a.RLock()
diff --git a/pkg/scheduler/objects/events/ask_events.go
b/pkg/scheduler/objects/events/ask_events.go
index 0e10a253..db657339 100644
--- a/pkg/scheduler/objects/events/ask_events.go
+++ b/pkg/scheduler/objects/events/ask_events.go
@@ -105,6 +105,15 @@ func (ae *AskEvents)
SendRequiredNodePreemptionFailed(allocKey, appID, node stri
ae.eventSystem.AddEvent(event)
}
+func (ae *AskEvents) SendPreemptedByScheduler(allocKey, appID,
preemptorAllocKey, preemptorAppId, preemptorQueuePath string, allocatedResource
*resources.Resource) {
+ if !ae.eventSystem.IsEventTrackingEnabled() {
+ return
+ }
+ message := fmt.Sprintf("Preempted by %s from application %s in %s",
preemptorAllocKey, preemptorAppId, preemptorQueuePath)
+ event := events.CreateRequestEventRecord(allocKey, appID, message,
allocatedResource)
+ ae.eventSystem.AddEvent(event)
+}
+
func NewAskEvents(evt events.EventSystem) *AskEvents {
return newAskEventsWithRate(evt, 15*time.Second, 1)
}
diff --git a/pkg/scheduler/objects/events/ask_events_test.go
b/pkg/scheduler/objects/events/ask_events_test.go
index 8f6f8558..d6680600 100644
--- a/pkg/scheduler/objects/events/ask_events_test.go
+++ b/pkg/scheduler/objects/events/ask_events_test.go
@@ -178,3 +178,23 @@ func TestRequiredNodePreemptionFailedEvents(t *testing.T) {
event = eventSystem.Events[0]
assert.Equal(t, "Unschedulable request 'alloc-1' with required node
'node-1', no preemption victim found", event.Message)
}
+
+func TestPreemptedBySchedulerEvents(t *testing.T) {
+ resource :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 1})
+ eventSystem := mock.NewEventSystemDisabled()
+ events := NewAskEvents(eventSystem)
+ events.SendPreemptedByScheduler("alloc-0", appID, "preemptor-0",
"preemptor-app-0", "root.parent.child1", resource)
+ assert.Equal(t, 0, len(eventSystem.Events))
+
+ eventSystem = mock.NewEventSystem()
+ events = NewAskEvents(eventSystem)
+ events.SendPreemptedByScheduler("alloc-0", appID, "preemptor-0",
"preemptor-app-0", "root.parent.child1", resource)
+ assert.Equal(t, 1, len(eventSystem.Events))
+ event := eventSystem.Events[0]
+ assert.Equal(t, "alloc-0", event.ObjectID)
+ assert.Equal(t, appID, event.ReferenceID)
+ assert.Equal(t, si.EventRecord_REQUEST, event.Type)
+ assert.Equal(t, si.EventRecord_NONE, event.EventChangeType)
+ assert.Equal(t, si.EventRecord_DETAILS_NONE, event.EventChangeDetail)
+ assert.Equal(t, "Preempted by preemptor-0 from application
preemptor-app-0 in root.parent.child1", event.Message)
+}
diff --git a/pkg/scheduler/objects/preemption.go
b/pkg/scheduler/objects/preemption.go
index 441b12dd..10cacea2 100644
--- a/pkg/scheduler/objects/preemption.go
+++ b/pkg/scheduler/objects/preemption.go
@@ -635,6 +635,7 @@ func (p *Preemptor) TryPreemption() (*AllocationResult,
bool) {
zap.String("victimNodeID", victim.GetNodeID()),
zap.String("victimQueue", victimQueue.Name),
)
+
victim.SendPreemptedBySchedulerEvent(p.ask.allocationKey, p.ask.applicationID,
p.application.queuePath)
} else {
log.Log(log.SchedPreemption).Warn("BUG: Queue not found
for preemption victim",
zap.String("queue", p.queue.Name),
diff --git a/pkg/scheduler/objects/preemption_test.go
b/pkg/scheduler/objects/preemption_test.go
index b2fce675..e845da92 100644
--- a/pkg/scheduler/objects/preemption_test.go
+++ b/pkg/scheduler/objects/preemption_test.go
@@ -28,8 +28,10 @@ import (
"github.com/apache/yunikorn-core/pkg/common"
"github.com/apache/yunikorn-core/pkg/common/resources"
+ evtMock "github.com/apache/yunikorn-core/pkg/events/mock"
"github.com/apache/yunikorn-core/pkg/mock"
"github.com/apache/yunikorn-core/pkg/plugins"
+ schedEvt "github.com/apache/yunikorn-core/pkg/scheduler/objects/events"
"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
)
@@ -293,6 +295,58 @@ func TestTryPreemption(t *testing.T) {
assert.Equal(t, len(ask3.GetAllocationLog()), 0)
}
+func TestTryPreemption_SendEvent(t *testing.T) {
+ node := newNode(nodeID1, map[string]resources.Quantity{"first": 10,
"pods": 5})
+ iterator := getNodeIteratorFn(node)
+ rootQ, err := createRootQueue(map[string]string{"first": "20", "pods":
"5"})
+ assert.NilError(t, err)
+ parentQ, err := createManagedQueueGuaranteed(rootQ, "parent", true,
map[string]string{"first": "20"}, map[string]string{"first": "10"})
+ assert.NilError(t, err)
+ childQ1, err := createManagedQueueGuaranteed(parentQ, "child1", false,
map[string]string{"first": "10"}, map[string]string{"first": "5"})
+ assert.NilError(t, err)
+ childQ2, err := createManagedQueueGuaranteed(parentQ, "child2", false,
map[string]string{"first": "10"}, map[string]string{"first": "5"})
+ assert.NilError(t, err)
+
+ alloc1, alloc2, err := creatApp1(childQ1, node, nil,
map[string]resources.Quantity{"first": 5, "pods": 1})
+ assert.NilError(t, err)
+
+ eventSystem := evtMock.NewEventSystem()
+ events := schedEvt.NewAskEvents(eventSystem)
+ alloc1.askEvents = events
+
+ app2, ask3, err := creatApp2(childQ2,
map[string]resources.Quantity{"first": 5, "pods": 1}, "alloc3")
+ assert.NilError(t, err)
+ childQ2.incPendingResource(ask3.GetAllocatedResource())
+
+ headRoom :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10, "pods":
3})
+ preemptor := NewPreemptor(app2, headRoom, 30*time.Second, ask3,
iterator(), false)
+
+ // register predicate handler
+ preemptions := []mock.Preemption{
+ mock.NewPreemption(true, "alloc3", nodeID1, []string{"alloc1"},
0, 0),
+ }
+ plugin := mock.NewPreemptionPredicatePlugin(nil, nil, preemptions)
+ plugins.RegisterSchedulerPlugin(plugin)
+ defer plugins.UnregisterSchedulerPlugins()
+
+ result, ok := preemptor.TryPreemption()
+ assert.Assert(t, result != nil, "no result")
+ assert.NilError(t, plugin.GetPredicateError())
+ assert.Assert(t, ok, "no victims found")
+ assert.Equal(t, "alloc3", result.Request.GetAllocationKey(), "wrong
alloc")
+ assert.Check(t, alloc1.IsPreempted(), "alloc1 not preempted")
+ assert.Check(t, !alloc2.IsPreempted(), "alloc2 preempted")
+ assert.Equal(t, 1, len(eventSystem.Events))
+ event := eventSystem.Events[0]
+ assert.Equal(t, alloc1.applicationID, event.ReferenceID)
+ assert.Equal(t, alloc1.allocationKey, event.ObjectID)
+ assert.Equal(t, si.EventRecord_NONE, event.EventChangeType)
+ assert.Equal(t, si.EventRecord_DETAILS_NONE, event.EventChangeDetail)
+ assert.Equal(t, si.EventRecord_REQUEST, event.Type)
+ assert.Equal(t, fmt.Sprintf("Preempted by %s from application %s in
%s", "alloc3", appID2, "root.parent.child2"), event.Message)
+ assert.Equal(t, len(ask3.GetAllocationLog()), 0)
+}
+
// TestTryPreemptionOnNode Test try preemption on node with simple queue
hierarchy. Since Node doesn't have enough resources to accomodate, preemption
happens because of node resource constraint.
// Guaranteed and Max resource set on both victim queue path and preemptor
queue path in 2 levels. victim and preemptor queue are siblings.
// Request (Preemptor) resource type matches with all resource types of the
victim. But Guaranteed set only on specific resource type. 2 Victims are
available, but 1 should be preempted because further preemption would make
usage go below the guaranteed quota
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]