This is an automated email from the ASF dual-hosted git repository. pbacsko pushed a commit to branch branch-1.5 in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git
commit 78c849e95b86b0843d8b2b7cf9d901b0fcfec1dc Author: Peter Bacsko <pbac...@cloudera.com> AuthorDate: Mon Apr 15 13:18:44 2024 +0200 [YUNIKORN-2552] Recursive locking when sending remove queue event (#841) Closes: #841 Signed-off-by: Peter Bacsko <pbac...@cloudera.com> --- pkg/scheduler/objects/queue.go | 32 +++++------ pkg/scheduler/objects/queue_events.go | 43 +++++++------- pkg/scheduler/objects/queue_events_test.go | 91 +++++++++++------------------- 3 files changed, 67 insertions(+), 99 deletions(-) diff --git a/pkg/scheduler/objects/queue.go b/pkg/scheduler/objects/queue.go index 827c0aca..c45b4ac5 100644 --- a/pkg/scheduler/objects/queue.go +++ b/pkg/scheduler/objects/queue.go @@ -142,10 +142,10 @@ func NewConfiguredQueue(conf configs.QueueConfig, parent *Queue) (*Queue, error) } else { sq.UpdateQueueProperties() } - sq.queueEvents = newQueueEvents(sq, events.GetEventSystem()) + sq.queueEvents = newQueueEvents(events.GetEventSystem()) log.Log(log.SchedQueue).Info("configured queue added to scheduler", zap.String("queueName", sq.QueuePath)) - sq.queueEvents.sendNewQueueEvent() + sq.queueEvents.sendNewQueueEvent(sq.QueuePath, sq.isManaged) return sq, nil } @@ -200,10 +200,10 @@ func newDynamicQueueInternal(name string, leaf bool, parent *Queue) (*Queue, err } sq.UpdateQueueProperties() - sq.queueEvents = newQueueEvents(sq, events.GetEventSystem()) + sq.queueEvents = newQueueEvents(events.GetEventSystem()) log.Log(log.SchedQueue).Info("dynamic queue added to scheduler", zap.String("queueName", sq.QueuePath)) - sq.queueEvents.sendNewQueueEvent() + sq.queueEvents.sendNewQueueEvent(sq.QueuePath, sq.isManaged) return sq, nil } @@ -338,7 +338,7 @@ func (sq *Queue) applyConf(conf configs.QueueConfig) error { } if prevLeaf != sq.isLeaf && sq.queueEvents != nil { - sq.queueEvents.sendTypeChangedEvent() + sq.queueEvents.sendTypeChangedEvent(sq.QueuePath, sq.isLeaf) } if !sq.isLeaf { @@ -384,7 +384,7 @@ func (sq *Queue) setResources(resource configs.Resources) error { zap.Stringer("current", sq.maxResource), zap.Stringer("new", maxResource)) if !resources.Equals(sq.maxResource, maxResource) && sq.queueEvents != nil { - sq.queueEvents.sendMaxResourceChangedEvent() + sq.queueEvents.sendMaxResourceChangedEvent(sq.QueuePath, maxResource) } sq.maxResource = maxResource sq.updateMaxResourceMetrics() @@ -394,7 +394,7 @@ func (sq *Queue) setResources(resource configs.Resources) error { zap.Stringer("current", sq.maxResource), zap.Stringer("new", maxResource)) if sq.queueEvents != nil { - sq.queueEvents.sendMaxResourceChangedEvent() + sq.queueEvents.sendMaxResourceChangedEvent(sq.QueuePath, maxResource) } sq.maxResource = nil sq.updateMaxResourceMetrics() @@ -410,7 +410,7 @@ func (sq *Queue) setResources(resource configs.Resources) error { zap.Stringer("current", sq.guaranteedResource), zap.Stringer("new", guaranteedResource)) if !resources.Equals(sq.guaranteedResource, guaranteedResource) && sq.queueEvents != nil { - sq.queueEvents.sendGuaranteedResourceChangedEvent() + sq.queueEvents.sendGuaranteedResourceChangedEvent(sq.QueuePath, guaranteedResource) } sq.guaranteedResource = guaranteedResource sq.updateGuaranteedResourceMetrics() @@ -420,7 +420,7 @@ func (sq *Queue) setResources(resource configs.Resources) error { zap.Stringer("current", sq.guaranteedResource), zap.Stringer("new", guaranteedResource)) if sq.queueEvents != nil { - sq.queueEvents.sendGuaranteedResourceChangedEvent() + sq.queueEvents.sendGuaranteedResourceChangedEvent(sq.QueuePath, guaranteedResource) } sq.guaranteedResource = nil sq.updateGuaranteedResourceMetrics() @@ -728,7 +728,7 @@ func (sq *Queue) AddApplication(app *Application) { defer sq.Unlock() appID := app.ApplicationID sq.applications[appID] = app - sq.queueEvents.sendNewApplicationEvent(appID) + sq.queueEvents.sendNewApplicationEvent(sq.QueuePath, appID) // YUNIKORN-199: update the quota from the namespace // get the tag with the quota quota := app.GetTag(siCommon.AppTagNamespaceResourceQuota) @@ -800,7 +800,7 @@ func (sq *Queue) RemoveApplication(app *Application) { zap.String("applicationID", appID)) return } - sq.queueEvents.sendRemoveApplicationEvent(appID) + sq.queueEvents.sendRemoveApplicationEvent(sq.QueuePath, appID) if appPending := app.GetPendingResource(); !resources.IsZero(appPending) { sq.decPendingResource(appPending) } @@ -1002,7 +1002,7 @@ func (sq *Queue) RemoveQueue() bool { log.Log(log.SchedQueue).Info("removing queue", zap.String("queue", sq.QueuePath)) // root is always managed and is the only queue with a nil parent: no need to guard sq.parent.removeChildQueue(sq.Name) - sq.queueEvents.sendRemoveQueueEvent() + sq.queueEvents.sendRemoveQueueEvent(sq.QueuePath, sq.isManaged) return true } @@ -1323,7 +1323,7 @@ func (sq *Queue) SetMaxResource(max *resources.Resource) { zap.Stringer("current", sq.maxResource), zap.Stringer("new", max)) if !resources.Equals(sq.maxResource, max) && sq.queueEvents != nil { - sq.queueEvents.sendMaxResourceChangedEvent() + sq.queueEvents.sendMaxResourceChangedEvent(sq.QueuePath, sq.maxResource) } sq.maxResource = max.Clone() sq.updateMaxResourceMetrics() @@ -1333,7 +1333,7 @@ func (sq *Queue) SetMaxResource(max *resources.Resource) { zap.Stringer("current", sq.maxResource), zap.Stringer("new", max)) if sq.queueEvents != nil { - sq.queueEvents.sendMaxResourceChangedEvent() + sq.queueEvents.sendMaxResourceChangedEvent(sq.QueuePath, sq.maxResource) } sq.maxResource = nil sq.updateMaxResourceMetrics() @@ -1989,7 +1989,3 @@ func (sq *Queue) recalculatePriority() int32 { sq.currentPriority = curr return priorityValueByPolicy(sq.priorityPolicy, sq.priorityOffset, curr) } - -func (sq *Queue) SendRemoveQueueEvent() { - sq.queueEvents.sendRemoveQueueEvent() -} diff --git a/pkg/scheduler/objects/queue_events.go b/pkg/scheduler/objects/queue_events.go index 35160484..7ef24d3f 100644 --- a/pkg/scheduler/objects/queue_events.go +++ b/pkg/scheduler/objects/queue_events.go @@ -20,93 +20,92 @@ package objects import ( "github.com/apache/yunikorn-core/pkg/common" + "github.com/apache/yunikorn-core/pkg/common/resources" "github.com/apache/yunikorn-core/pkg/events" "github.com/apache/yunikorn-scheduler-interface/lib/go/si" ) type queueEvents struct { eventSystem events.EventSystem - queue *Queue } -func (q *queueEvents) sendNewQueueEvent() { +func (q *queueEvents) sendNewQueueEvent(queuePath string, managed bool) { if !q.eventSystem.IsEventTrackingEnabled() { return } detail := si.EventRecord_QUEUE_DYNAMIC - if q.queue.IsManaged() { + if managed { detail = si.EventRecord_DETAILS_NONE } - event := events.CreateQueueEventRecord(q.queue.QueuePath, common.Empty, common.Empty, si.EventRecord_ADD, + event := events.CreateQueueEventRecord(queuePath, common.Empty, common.Empty, si.EventRecord_ADD, detail, nil) q.eventSystem.AddEvent(event) } -func (q *queueEvents) sendNewApplicationEvent(appID string) { +func (q *queueEvents) sendNewApplicationEvent(queuePath, appID string) { if !q.eventSystem.IsEventTrackingEnabled() { return } - event := events.CreateQueueEventRecord(q.queue.QueuePath, common.Empty, appID, si.EventRecord_ADD, + event := events.CreateQueueEventRecord(queuePath, common.Empty, appID, si.EventRecord_ADD, si.EventRecord_QUEUE_APP, nil) q.eventSystem.AddEvent(event) } -func (q *queueEvents) sendRemoveQueueEvent() { +func (q *queueEvents) sendRemoveQueueEvent(queuePath string, managed bool) { if !q.eventSystem.IsEventTrackingEnabled() { return } detail := si.EventRecord_QUEUE_DYNAMIC - if q.queue.IsManaged() { + if managed { detail = si.EventRecord_DETAILS_NONE } - event := events.CreateQueueEventRecord(q.queue.QueuePath, common.Empty, common.Empty, si.EventRecord_REMOVE, + event := events.CreateQueueEventRecord(queuePath, common.Empty, common.Empty, si.EventRecord_REMOVE, detail, nil) q.eventSystem.AddEvent(event) } -func (q *queueEvents) sendRemoveApplicationEvent(appID string) { +func (q *queueEvents) sendRemoveApplicationEvent(queuePath, appID string) { if !q.eventSystem.IsEventTrackingEnabled() { return } - event := events.CreateQueueEventRecord(q.queue.QueuePath, common.Empty, appID, si.EventRecord_REMOVE, + event := events.CreateQueueEventRecord(queuePath, common.Empty, appID, si.EventRecord_REMOVE, si.EventRecord_QUEUE_APP, nil) q.eventSystem.AddEvent(event) } -func (q *queueEvents) sendMaxResourceChangedEvent() { +func (q *queueEvents) sendMaxResourceChangedEvent(queuePath string, maxResource *resources.Resource) { if !q.eventSystem.IsEventTrackingEnabled() { return } - event := events.CreateQueueEventRecord(q.queue.QueuePath, common.Empty, common.Empty, si.EventRecord_SET, - si.EventRecord_QUEUE_MAX, q.queue.maxResource) + event := events.CreateQueueEventRecord(queuePath, common.Empty, common.Empty, si.EventRecord_SET, + si.EventRecord_QUEUE_MAX, maxResource) q.eventSystem.AddEvent(event) } -func (q *queueEvents) sendGuaranteedResourceChangedEvent() { +func (q *queueEvents) sendGuaranteedResourceChangedEvent(queuePath string, guaranteed *resources.Resource) { if !q.eventSystem.IsEventTrackingEnabled() { return } - event := events.CreateQueueEventRecord(q.queue.QueuePath, common.Empty, common.Empty, si.EventRecord_SET, - si.EventRecord_QUEUE_GUARANTEED, q.queue.guaranteedResource) + event := events.CreateQueueEventRecord(queuePath, common.Empty, common.Empty, si.EventRecord_SET, + si.EventRecord_QUEUE_GUARANTEED, guaranteed) q.eventSystem.AddEvent(event) } -func (q *queueEvents) sendTypeChangedEvent() { +func (q *queueEvents) sendTypeChangedEvent(queuePath string, isLeaf bool) { if !q.eventSystem.IsEventTrackingEnabled() { return } message := "leaf queue: false" - if q.queue.isLeaf { + if isLeaf { message = "leaf queue: true" } - event := events.CreateQueueEventRecord(q.queue.QueuePath, message, common.Empty, si.EventRecord_SET, + event := events.CreateQueueEventRecord(queuePath, message, common.Empty, si.EventRecord_SET, si.EventRecord_QUEUE_TYPE, nil) q.eventSystem.AddEvent(event) } -func newQueueEvents(queue *Queue, evt events.EventSystem) *queueEvents { +func newQueueEvents(evt events.EventSystem) *queueEvents { return &queueEvents{ eventSystem: evt, - queue: queue, } } diff --git a/pkg/scheduler/objects/queue_events_test.go b/pkg/scheduler/objects/queue_events_test.go index d36bf36d..07d781fb 100644 --- a/pkg/scheduler/objects/queue_events_test.go +++ b/pkg/scheduler/objects/queue_events_test.go @@ -39,13 +39,13 @@ func TestSendNewQueueEvent(t *testing.T) { isManaged: true, } eventSystem := mock.NewEventSystemDisabled() - nq := newQueueEvents(queue, eventSystem) - nq.sendNewQueueEvent() + nq := newQueueEvents(eventSystem) + nq.sendNewQueueEvent(queue.QueuePath, false) assert.Equal(t, 0, len(eventSystem.Events), "unexpected event") eventSystem = mock.NewEventSystem() - nq = newQueueEvents(queue, eventSystem) - nq.sendNewQueueEvent() + nq = newQueueEvents(eventSystem) + nq.sendNewQueueEvent(queue.QueuePath, true) assert.Equal(t, 1, len(eventSystem.Events), "event was not generated") event := eventSystem.Events[0] assert.Equal(t, si.EventRecord_QUEUE, event.Type) @@ -56,28 +56,21 @@ func TestSendNewQueueEvent(t *testing.T) { assert.Equal(t, si.EventRecord_DETAILS_NONE, event.EventChangeDetail) assert.Equal(t, 0, len(event.Resource.Resources)) eventSystem = mock.NewEventSystem() - nq = newQueueEvents(&Queue{ - QueuePath: testQueuePath, - isManaged: false, - }, eventSystem) - nq.sendNewQueueEvent() + nq = newQueueEvents(eventSystem) + nq.sendNewQueueEvent(queue.QueuePath, false) event = eventSystem.Events[0] assert.Equal(t, si.EventRecord_QUEUE_DYNAMIC, event.EventChangeDetail) } func TestSendRemoveQueueEvent(t *testing.T) { - queue := &Queue{ - QueuePath: testQueuePath, - isManaged: true, - } eventSystem := mock.NewEventSystemDisabled() - nq := newQueueEvents(queue, eventSystem) - nq.sendRemoveQueueEvent() + nq := newQueueEvents(eventSystem) + nq.sendRemoveQueueEvent(testQueuePath, true) assert.Equal(t, 0, len(eventSystem.Events), "unexpected event") eventSystem = mock.NewEventSystem() - nq = newQueueEvents(queue, eventSystem) - nq.sendRemoveQueueEvent() + nq = newQueueEvents(eventSystem) + nq.sendRemoveQueueEvent(testQueuePath, true) assert.Equal(t, 1, len(eventSystem.Events), "event was not generated") event := eventSystem.Events[0] assert.Equal(t, si.EventRecord_QUEUE, event.Type) @@ -88,27 +81,21 @@ func TestSendRemoveQueueEvent(t *testing.T) { assert.Equal(t, si.EventRecord_DETAILS_NONE, event.EventChangeDetail) assert.Equal(t, 0, len(event.Resource.Resources)) eventSystem = mock.NewEventSystem() - nq = newQueueEvents(&Queue{ - QueuePath: testQueuePath, - isManaged: false, - }, eventSystem) - nq.sendRemoveQueueEvent() + nq = newQueueEvents(eventSystem) + nq.sendRemoveQueueEvent(testQueuePath, false) event = eventSystem.Events[0] assert.Equal(t, si.EventRecord_QUEUE_DYNAMIC, event.EventChangeDetail) } func TestNewApplicationEvent(t *testing.T) { - queue := &Queue{ - QueuePath: testQueuePath, - } eventSystem := mock.NewEventSystemDisabled() - nq := newQueueEvents(queue, eventSystem) - nq.sendNewApplicationEvent(appID0) + nq := newQueueEvents(eventSystem) + nq.sendNewApplicationEvent(testQueuePath, appID0) assert.Equal(t, 0, len(eventSystem.Events), "unexpected event") eventSystem = mock.NewEventSystem() - nq = newQueueEvents(queue, eventSystem) - nq.sendNewApplicationEvent(appID0) + nq = newQueueEvents(eventSystem) + nq.sendNewApplicationEvent(testQueuePath, appID0) assert.Equal(t, 1, len(eventSystem.Events), "event was not generated") event := eventSystem.Events[0] assert.Equal(t, si.EventRecord_QUEUE, event.Type) @@ -121,17 +108,14 @@ func TestNewApplicationEvent(t *testing.T) { } func TestRemoveApplicationEvent(t *testing.T) { - queue := &Queue{ - QueuePath: testQueuePath, - } eventSystem := mock.NewEventSystemDisabled() - nq := newQueueEvents(queue, eventSystem) - nq.sendRemoveApplicationEvent(appID0) + nq := newQueueEvents(eventSystem) + nq.sendRemoveApplicationEvent(testQueuePath, appID0) assert.Equal(t, 0, len(eventSystem.Events), "unexpected event") eventSystem = mock.NewEventSystem() - nq = newQueueEvents(queue, eventSystem) - nq.sendRemoveApplicationEvent(appID0) + nq = newQueueEvents(eventSystem) + nq.sendRemoveApplicationEvent(testQueuePath, appID0) assert.Equal(t, 1, len(eventSystem.Events), "event was not generated") event := eventSystem.Events[0] assert.Equal(t, si.EventRecord_QUEUE, event.Type) @@ -144,17 +128,14 @@ func TestRemoveApplicationEvent(t *testing.T) { } func TestTypeChangedEvent(t *testing.T) { - queue := &Queue{ - QueuePath: testQueuePath, - } eventSystem := mock.NewEventSystemDisabled() - nq := newQueueEvents(queue, eventSystem) - nq.sendTypeChangedEvent() + nq := newQueueEvents(eventSystem) + nq.sendTypeChangedEvent(testQueuePath, true) assert.Equal(t, 0, len(eventSystem.Events), "unexpected event") eventSystem = mock.NewEventSystem() - nq = newQueueEvents(queue, eventSystem) - nq.sendTypeChangedEvent() + nq = newQueueEvents(eventSystem) + nq.sendTypeChangedEvent(testQueuePath, false) assert.Equal(t, 1, len(eventSystem.Events), "event was not generated") event := eventSystem.Events[0] assert.Equal(t, si.EventRecord_QUEUE, event.Type) @@ -168,18 +149,14 @@ func TestTypeChangedEvent(t *testing.T) { func TestSendMaxResourceChangedEvent(t *testing.T) { maxRes := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 1}) - queue := &Queue{ - QueuePath: testQueuePath, - maxResource: maxRes, - } eventSystem := mock.NewEventSystemDisabled() - nq := newQueueEvents(queue, eventSystem) - nq.sendMaxResourceChangedEvent() + nq := newQueueEvents(eventSystem) + nq.sendMaxResourceChangedEvent(testQueuePath, maxRes) assert.Equal(t, 0, len(eventSystem.Events), "unexpected event") eventSystem = mock.NewEventSystem() - nq = newQueueEvents(queue, eventSystem) - nq.sendMaxResourceChangedEvent() + nq = newQueueEvents(eventSystem) + nq.sendMaxResourceChangedEvent(testQueuePath, maxRes) assert.Equal(t, 1, len(eventSystem.Events), "event was not generated") event := eventSystem.Events[0] assert.Equal(t, si.EventRecord_QUEUE, event.Type) @@ -195,18 +172,14 @@ func TestSendMaxResourceChangedEvent(t *testing.T) { func TestSendGuaranteedResourceChangedEvent(t *testing.T) { guaranteed := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 1}) - queue := &Queue{ - QueuePath: testQueuePath, - guaranteedResource: guaranteed, - } eventSystem := mock.NewEventSystemDisabled() - nq := newQueueEvents(queue, eventSystem) - nq.sendGuaranteedResourceChangedEvent() + nq := newQueueEvents(eventSystem) + nq.sendGuaranteedResourceChangedEvent(testQueuePath, guaranteed) assert.Equal(t, 0, len(eventSystem.Events), "unexpected event") eventSystem = mock.NewEventSystem() - nq = newQueueEvents(queue, eventSystem) - nq.sendGuaranteedResourceChangedEvent() + nq = newQueueEvents(eventSystem) + nq.sendGuaranteedResourceChangedEvent(testQueuePath, guaranteed) assert.Equal(t, 1, len(eventSystem.Events), "event was not generated") event := eventSystem.Events[0] assert.Equal(t, si.EventRecord_QUEUE, event.Type) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@yunikorn.apache.org For additional commands, e-mail: issues-h...@yunikorn.apache.org