This is an automated email from the ASF dual-hosted git repository.
pbacsko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git
The following commit(s) were added to refs/heads/master by this push:
new 72540e2b [YUNIKORN-2552] Recursive locking when sending remove queue
event (#841)
72540e2b is described below
commit 72540e2b277ffc3b00ec19b21a69e31da9fbbf09
Author: Peter Bacsko <[email protected]>
AuthorDate: Mon Apr 15 13:18:44 2024 +0200
[YUNIKORN-2552] Recursive locking when sending remove queue event (#841)
Closes: #841
Signed-off-by: Peter Bacsko <[email protected]>
---
pkg/scheduler/objects/queue.go | 32 +++++------
pkg/scheduler/objects/queue_events.go | 43 +++++++-------
pkg/scheduler/objects/queue_events_test.go | 91 +++++++++++-------------------
3 files changed, 67 insertions(+), 99 deletions(-)
diff --git a/pkg/scheduler/objects/queue.go b/pkg/scheduler/objects/queue.go
index 4999fd92..3de469a4 100644
--- a/pkg/scheduler/objects/queue.go
+++ b/pkg/scheduler/objects/queue.go
@@ -142,10 +142,10 @@ func NewConfiguredQueue(conf configs.QueueConfig, parent
*Queue) (*Queue, error)
} else {
sq.UpdateQueueProperties()
}
- sq.queueEvents = newQueueEvents(sq, events.GetEventSystem())
+ sq.queueEvents = newQueueEvents(events.GetEventSystem())
log.Log(log.SchedQueue).Info("configured queue added to scheduler",
zap.String("queueName", sq.QueuePath))
- sq.queueEvents.sendNewQueueEvent()
+ sq.queueEvents.sendNewQueueEvent(sq.QueuePath, sq.isManaged)
return sq, nil
}
@@ -200,10 +200,10 @@ func newDynamicQueueInternal(name string, leaf bool,
parent *Queue) (*Queue, err
}
sq.UpdateQueueProperties()
- sq.queueEvents = newQueueEvents(sq, events.GetEventSystem())
+ sq.queueEvents = newQueueEvents(events.GetEventSystem())
log.Log(log.SchedQueue).Info("dynamic queue added to scheduler",
zap.String("queueName", sq.QueuePath))
- sq.queueEvents.sendNewQueueEvent()
+ sq.queueEvents.sendNewQueueEvent(sq.QueuePath, sq.isManaged)
return sq, nil
}
@@ -346,7 +346,7 @@ func (sq *Queue) applyConf(conf configs.QueueConfig) error {
}
if prevLeaf != sq.isLeaf && sq.queueEvents != nil {
- sq.queueEvents.sendTypeChangedEvent()
+ sq.queueEvents.sendTypeChangedEvent(sq.QueuePath, sq.isLeaf)
}
if !sq.isLeaf {
@@ -392,7 +392,7 @@ func (sq *Queue) setResources(resource configs.Resources)
error {
zap.Stringer("current", sq.maxResource),
zap.Stringer("new", maxResource))
if !resources.Equals(sq.maxResource, maxResource) &&
sq.queueEvents != nil {
- sq.queueEvents.sendMaxResourceChangedEvent()
+
sq.queueEvents.sendMaxResourceChangedEvent(sq.QueuePath, maxResource)
}
sq.maxResource = maxResource
sq.updateMaxResourceMetrics()
@@ -402,7 +402,7 @@ func (sq *Queue) setResources(resource configs.Resources)
error {
zap.Stringer("current", sq.maxResource),
zap.Stringer("new", maxResource))
if sq.queueEvents != nil {
- sq.queueEvents.sendMaxResourceChangedEvent()
+
sq.queueEvents.sendMaxResourceChangedEvent(sq.QueuePath, maxResource)
}
sq.maxResource = nil
sq.updateMaxResourceMetrics()
@@ -418,7 +418,7 @@ func (sq *Queue) setResources(resource configs.Resources)
error {
zap.Stringer("current", sq.guaranteedResource),
zap.Stringer("new", guaranteedResource))
if !resources.Equals(sq.guaranteedResource, guaranteedResource)
&& sq.queueEvents != nil {
- sq.queueEvents.sendGuaranteedResourceChangedEvent()
+
sq.queueEvents.sendGuaranteedResourceChangedEvent(sq.QueuePath,
guaranteedResource)
}
sq.guaranteedResource = guaranteedResource
sq.updateGuaranteedResourceMetrics()
@@ -428,7 +428,7 @@ func (sq *Queue) setResources(resource configs.Resources)
error {
zap.Stringer("current", sq.guaranteedResource),
zap.Stringer("new", guaranteedResource))
if sq.queueEvents != nil {
- sq.queueEvents.sendGuaranteedResourceChangedEvent()
+
sq.queueEvents.sendGuaranteedResourceChangedEvent(sq.QueuePath,
guaranteedResource)
}
sq.guaranteedResource = nil
sq.updateGuaranteedResourceMetrics()
@@ -736,7 +736,7 @@ func (sq *Queue) AddApplication(app *Application) {
defer sq.Unlock()
appID := app.ApplicationID
sq.applications[appID] = app
- sq.queueEvents.sendNewApplicationEvent(appID)
+ sq.queueEvents.sendNewApplicationEvent(sq.QueuePath, appID)
// YUNIKORN-199: update the quota from the namespace
// get the tag with the quota
quota := app.GetTag(siCommon.AppTagNamespaceResourceQuota)
@@ -808,7 +808,7 @@ func (sq *Queue) RemoveApplication(app *Application) {
zap.String("applicationID", appID))
return
}
- sq.queueEvents.sendRemoveApplicationEvent(appID)
+ sq.queueEvents.sendRemoveApplicationEvent(sq.QueuePath, appID)
if appPending := app.GetPendingResource();
!resources.IsZero(appPending) {
sq.decPendingResource(appPending)
}
@@ -1010,7 +1010,7 @@ func (sq *Queue) RemoveQueue() bool {
log.Log(log.SchedQueue).Info("removing queue", zap.String("queue",
sq.QueuePath))
// root is always managed and is the only queue with a nil parent: no
need to guard
sq.parent.removeChildQueue(sq.Name)
- sq.queueEvents.sendRemoveQueueEvent()
+ sq.queueEvents.sendRemoveQueueEvent(sq.QueuePath, sq.isManaged)
return true
}
@@ -1322,7 +1322,7 @@ func (sq *Queue) SetMaxResource(max *resources.Resource) {
zap.Stringer("current", sq.maxResource),
zap.Stringer("new", max))
if !resources.Equals(sq.maxResource, max) && sq.queueEvents !=
nil {
- sq.queueEvents.sendMaxResourceChangedEvent()
+
sq.queueEvents.sendMaxResourceChangedEvent(sq.QueuePath, sq.maxResource)
}
sq.maxResource = max.Clone()
sq.updateMaxResourceMetrics()
@@ -1332,7 +1332,7 @@ func (sq *Queue) SetMaxResource(max *resources.Resource) {
zap.Stringer("current", sq.maxResource),
zap.Stringer("new", max))
if sq.queueEvents != nil {
- sq.queueEvents.sendMaxResourceChangedEvent()
+
sq.queueEvents.sendMaxResourceChangedEvent(sq.QueuePath, sq.maxResource)
}
sq.maxResource = nil
sq.updateMaxResourceMetrics()
@@ -1987,7 +1987,3 @@ func (sq *Queue) recalculatePriority() int32 {
sq.currentPriority = curr
return priorityValueByPolicy(sq.priorityPolicy, sq.priorityOffset, curr)
}
-
-func (sq *Queue) SendRemoveQueueEvent() {
- sq.queueEvents.sendRemoveQueueEvent()
-}
diff --git a/pkg/scheduler/objects/queue_events.go
b/pkg/scheduler/objects/queue_events.go
index 35160484..7ef24d3f 100644
--- a/pkg/scheduler/objects/queue_events.go
+++ b/pkg/scheduler/objects/queue_events.go
@@ -20,93 +20,92 @@ package objects
import (
"github.com/apache/yunikorn-core/pkg/common"
+ "github.com/apache/yunikorn-core/pkg/common/resources"
"github.com/apache/yunikorn-core/pkg/events"
"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
)
type queueEvents struct {
eventSystem events.EventSystem
- queue *Queue
}
-func (q *queueEvents) sendNewQueueEvent() {
+func (q *queueEvents) sendNewQueueEvent(queuePath string, managed bool) {
if !q.eventSystem.IsEventTrackingEnabled() {
return
}
detail := si.EventRecord_QUEUE_DYNAMIC
- if q.queue.IsManaged() {
+ if managed {
detail = si.EventRecord_DETAILS_NONE
}
- event := events.CreateQueueEventRecord(q.queue.QueuePath, common.Empty,
common.Empty, si.EventRecord_ADD,
+ event := events.CreateQueueEventRecord(queuePath, common.Empty,
common.Empty, si.EventRecord_ADD,
detail, nil)
q.eventSystem.AddEvent(event)
}
-func (q *queueEvents) sendNewApplicationEvent(appID string) {
+func (q *queueEvents) sendNewApplicationEvent(queuePath, appID string) {
if !q.eventSystem.IsEventTrackingEnabled() {
return
}
- event := events.CreateQueueEventRecord(q.queue.QueuePath, common.Empty,
appID, si.EventRecord_ADD,
+ event := events.CreateQueueEventRecord(queuePath, common.Empty, appID,
si.EventRecord_ADD,
si.EventRecord_QUEUE_APP, nil)
q.eventSystem.AddEvent(event)
}
-func (q *queueEvents) sendRemoveQueueEvent() {
+func (q *queueEvents) sendRemoveQueueEvent(queuePath string, managed bool) {
if !q.eventSystem.IsEventTrackingEnabled() {
return
}
detail := si.EventRecord_QUEUE_DYNAMIC
- if q.queue.IsManaged() {
+ if managed {
detail = si.EventRecord_DETAILS_NONE
}
- event := events.CreateQueueEventRecord(q.queue.QueuePath, common.Empty,
common.Empty, si.EventRecord_REMOVE,
+ event := events.CreateQueueEventRecord(queuePath, common.Empty,
common.Empty, si.EventRecord_REMOVE,
detail, nil)
q.eventSystem.AddEvent(event)
}
-func (q *queueEvents) sendRemoveApplicationEvent(appID string) {
+func (q *queueEvents) sendRemoveApplicationEvent(queuePath, appID string) {
if !q.eventSystem.IsEventTrackingEnabled() {
return
}
- event := events.CreateQueueEventRecord(q.queue.QueuePath, common.Empty,
appID, si.EventRecord_REMOVE,
+ event := events.CreateQueueEventRecord(queuePath, common.Empty, appID,
si.EventRecord_REMOVE,
si.EventRecord_QUEUE_APP, nil)
q.eventSystem.AddEvent(event)
}
-func (q *queueEvents) sendMaxResourceChangedEvent() {
+func (q *queueEvents) sendMaxResourceChangedEvent(queuePath string,
maxResource *resources.Resource) {
if !q.eventSystem.IsEventTrackingEnabled() {
return
}
- event := events.CreateQueueEventRecord(q.queue.QueuePath, common.Empty,
common.Empty, si.EventRecord_SET,
- si.EventRecord_QUEUE_MAX, q.queue.maxResource)
+ event := events.CreateQueueEventRecord(queuePath, common.Empty,
common.Empty, si.EventRecord_SET,
+ si.EventRecord_QUEUE_MAX, maxResource)
q.eventSystem.AddEvent(event)
}
-func (q *queueEvents) sendGuaranteedResourceChangedEvent() {
+func (q *queueEvents) sendGuaranteedResourceChangedEvent(queuePath string,
guaranteed *resources.Resource) {
if !q.eventSystem.IsEventTrackingEnabled() {
return
}
- event := events.CreateQueueEventRecord(q.queue.QueuePath, common.Empty,
common.Empty, si.EventRecord_SET,
- si.EventRecord_QUEUE_GUARANTEED, q.queue.guaranteedResource)
+ event := events.CreateQueueEventRecord(queuePath, common.Empty,
common.Empty, si.EventRecord_SET,
+ si.EventRecord_QUEUE_GUARANTEED, guaranteed)
q.eventSystem.AddEvent(event)
}
-func (q *queueEvents) sendTypeChangedEvent() {
+func (q *queueEvents) sendTypeChangedEvent(queuePath string, isLeaf bool) {
if !q.eventSystem.IsEventTrackingEnabled() {
return
}
message := "leaf queue: false"
- if q.queue.isLeaf {
+ if isLeaf {
message = "leaf queue: true"
}
- event := events.CreateQueueEventRecord(q.queue.QueuePath, message,
common.Empty, si.EventRecord_SET,
+ event := events.CreateQueueEventRecord(queuePath, message,
common.Empty, si.EventRecord_SET,
si.EventRecord_QUEUE_TYPE, nil)
q.eventSystem.AddEvent(event)
}
-func newQueueEvents(queue *Queue, evt events.EventSystem) *queueEvents {
+func newQueueEvents(evt events.EventSystem) *queueEvents {
return &queueEvents{
eventSystem: evt,
- queue: queue,
}
}
diff --git a/pkg/scheduler/objects/queue_events_test.go
b/pkg/scheduler/objects/queue_events_test.go
index d36bf36d..07d781fb 100644
--- a/pkg/scheduler/objects/queue_events_test.go
+++ b/pkg/scheduler/objects/queue_events_test.go
@@ -39,13 +39,13 @@ func TestSendNewQueueEvent(t *testing.T) {
isManaged: true,
}
eventSystem := mock.NewEventSystemDisabled()
- nq := newQueueEvents(queue, eventSystem)
- nq.sendNewQueueEvent()
+ nq := newQueueEvents(eventSystem)
+ nq.sendNewQueueEvent(queue.QueuePath, false)
assert.Equal(t, 0, len(eventSystem.Events), "unexpected event")
eventSystem = mock.NewEventSystem()
- nq = newQueueEvents(queue, eventSystem)
- nq.sendNewQueueEvent()
+ nq = newQueueEvents(eventSystem)
+ nq.sendNewQueueEvent(queue.QueuePath, true)
assert.Equal(t, 1, len(eventSystem.Events), "event was not generated")
event := eventSystem.Events[0]
assert.Equal(t, si.EventRecord_QUEUE, event.Type)
@@ -56,28 +56,21 @@ func TestSendNewQueueEvent(t *testing.T) {
assert.Equal(t, si.EventRecord_DETAILS_NONE, event.EventChangeDetail)
assert.Equal(t, 0, len(event.Resource.Resources))
eventSystem = mock.NewEventSystem()
- nq = newQueueEvents(&Queue{
- QueuePath: testQueuePath,
- isManaged: false,
- }, eventSystem)
- nq.sendNewQueueEvent()
+ nq = newQueueEvents(eventSystem)
+ nq.sendNewQueueEvent(queue.QueuePath, false)
event = eventSystem.Events[0]
assert.Equal(t, si.EventRecord_QUEUE_DYNAMIC, event.EventChangeDetail)
}
func TestSendRemoveQueueEvent(t *testing.T) {
- queue := &Queue{
- QueuePath: testQueuePath,
- isManaged: true,
- }
eventSystem := mock.NewEventSystemDisabled()
- nq := newQueueEvents(queue, eventSystem)
- nq.sendRemoveQueueEvent()
+ nq := newQueueEvents(eventSystem)
+ nq.sendRemoveQueueEvent(testQueuePath, true)
assert.Equal(t, 0, len(eventSystem.Events), "unexpected event")
eventSystem = mock.NewEventSystem()
- nq = newQueueEvents(queue, eventSystem)
- nq.sendRemoveQueueEvent()
+ nq = newQueueEvents(eventSystem)
+ nq.sendRemoveQueueEvent(testQueuePath, true)
assert.Equal(t, 1, len(eventSystem.Events), "event was not generated")
event := eventSystem.Events[0]
assert.Equal(t, si.EventRecord_QUEUE, event.Type)
@@ -88,27 +81,21 @@ func TestSendRemoveQueueEvent(t *testing.T) {
assert.Equal(t, si.EventRecord_DETAILS_NONE, event.EventChangeDetail)
assert.Equal(t, 0, len(event.Resource.Resources))
eventSystem = mock.NewEventSystem()
- nq = newQueueEvents(&Queue{
- QueuePath: testQueuePath,
- isManaged: false,
- }, eventSystem)
- nq.sendRemoveQueueEvent()
+ nq = newQueueEvents(eventSystem)
+ nq.sendRemoveQueueEvent(testQueuePath, false)
event = eventSystem.Events[0]
assert.Equal(t, si.EventRecord_QUEUE_DYNAMIC, event.EventChangeDetail)
}
func TestNewApplicationEvent(t *testing.T) {
- queue := &Queue{
- QueuePath: testQueuePath,
- }
eventSystem := mock.NewEventSystemDisabled()
- nq := newQueueEvents(queue, eventSystem)
- nq.sendNewApplicationEvent(appID0)
+ nq := newQueueEvents(eventSystem)
+ nq.sendNewApplicationEvent(testQueuePath, appID0)
assert.Equal(t, 0, len(eventSystem.Events), "unexpected event")
eventSystem = mock.NewEventSystem()
- nq = newQueueEvents(queue, eventSystem)
- nq.sendNewApplicationEvent(appID0)
+ nq = newQueueEvents(eventSystem)
+ nq.sendNewApplicationEvent(testQueuePath, appID0)
assert.Equal(t, 1, len(eventSystem.Events), "event was not generated")
event := eventSystem.Events[0]
assert.Equal(t, si.EventRecord_QUEUE, event.Type)
@@ -121,17 +108,14 @@ func TestNewApplicationEvent(t *testing.T) {
}
func TestRemoveApplicationEvent(t *testing.T) {
- queue := &Queue{
- QueuePath: testQueuePath,
- }
eventSystem := mock.NewEventSystemDisabled()
- nq := newQueueEvents(queue, eventSystem)
- nq.sendRemoveApplicationEvent(appID0)
+ nq := newQueueEvents(eventSystem)
+ nq.sendRemoveApplicationEvent(testQueuePath, appID0)
assert.Equal(t, 0, len(eventSystem.Events), "unexpected event")
eventSystem = mock.NewEventSystem()
- nq = newQueueEvents(queue, eventSystem)
- nq.sendRemoveApplicationEvent(appID0)
+ nq = newQueueEvents(eventSystem)
+ nq.sendRemoveApplicationEvent(testQueuePath, appID0)
assert.Equal(t, 1, len(eventSystem.Events), "event was not generated")
event := eventSystem.Events[0]
assert.Equal(t, si.EventRecord_QUEUE, event.Type)
@@ -144,17 +128,14 @@ func TestRemoveApplicationEvent(t *testing.T) {
}
func TestTypeChangedEvent(t *testing.T) {
- queue := &Queue{
- QueuePath: testQueuePath,
- }
eventSystem := mock.NewEventSystemDisabled()
- nq := newQueueEvents(queue, eventSystem)
- nq.sendTypeChangedEvent()
+ nq := newQueueEvents(eventSystem)
+ nq.sendTypeChangedEvent(testQueuePath, true)
assert.Equal(t, 0, len(eventSystem.Events), "unexpected event")
eventSystem = mock.NewEventSystem()
- nq = newQueueEvents(queue, eventSystem)
- nq.sendTypeChangedEvent()
+ nq = newQueueEvents(eventSystem)
+ nq.sendTypeChangedEvent(testQueuePath, false)
assert.Equal(t, 1, len(eventSystem.Events), "event was not generated")
event := eventSystem.Events[0]
assert.Equal(t, si.EventRecord_QUEUE, event.Type)
@@ -168,18 +149,14 @@ func TestTypeChangedEvent(t *testing.T) {
func TestSendMaxResourceChangedEvent(t *testing.T) {
maxRes :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 1})
- queue := &Queue{
- QueuePath: testQueuePath,
- maxResource: maxRes,
- }
eventSystem := mock.NewEventSystemDisabled()
- nq := newQueueEvents(queue, eventSystem)
- nq.sendMaxResourceChangedEvent()
+ nq := newQueueEvents(eventSystem)
+ nq.sendMaxResourceChangedEvent(testQueuePath, maxRes)
assert.Equal(t, 0, len(eventSystem.Events), "unexpected event")
eventSystem = mock.NewEventSystem()
- nq = newQueueEvents(queue, eventSystem)
- nq.sendMaxResourceChangedEvent()
+ nq = newQueueEvents(eventSystem)
+ nq.sendMaxResourceChangedEvent(testQueuePath, maxRes)
assert.Equal(t, 1, len(eventSystem.Events), "event was not generated")
event := eventSystem.Events[0]
assert.Equal(t, si.EventRecord_QUEUE, event.Type)
@@ -195,18 +172,14 @@ func TestSendMaxResourceChangedEvent(t *testing.T) {
func TestSendGuaranteedResourceChangedEvent(t *testing.T) {
guaranteed :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 1})
- queue := &Queue{
- QueuePath: testQueuePath,
- guaranteedResource: guaranteed,
- }
eventSystem := mock.NewEventSystemDisabled()
- nq := newQueueEvents(queue, eventSystem)
- nq.sendGuaranteedResourceChangedEvent()
+ nq := newQueueEvents(eventSystem)
+ nq.sendGuaranteedResourceChangedEvent(testQueuePath, guaranteed)
assert.Equal(t, 0, len(eventSystem.Events), "unexpected event")
eventSystem = mock.NewEventSystem()
- nq = newQueueEvents(queue, eventSystem)
- nq.sendGuaranteedResourceChangedEvent()
+ nq = newQueueEvents(eventSystem)
+ nq.sendGuaranteedResourceChangedEvent(testQueuePath, guaranteed)
assert.Equal(t, 1, len(eventSystem.Events), "event was not generated")
event := eventSystem.Events[0]
assert.Equal(t, si.EventRecord_QUEUE, event.Type)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]