This is an automated email from the ASF dual-hosted git repository.

pbacsko pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git

commit 78c849e95b86b0843d8b2b7cf9d901b0fcfec1dc
Author: Peter Bacsko <pbac...@cloudera.com>
AuthorDate: Mon Apr 15 13:18:44 2024 +0200

    [YUNIKORN-2552] Recursive locking when sending remove queue event (#841)
    
    Closes: #841
    
    Signed-off-by: Peter Bacsko <pbac...@cloudera.com>
---
 pkg/scheduler/objects/queue.go             | 32 +++++------
 pkg/scheduler/objects/queue_events.go      | 43 +++++++-------
 pkg/scheduler/objects/queue_events_test.go | 91 +++++++++++-------------------
 3 files changed, 67 insertions(+), 99 deletions(-)

diff --git a/pkg/scheduler/objects/queue.go b/pkg/scheduler/objects/queue.go
index 827c0aca..c45b4ac5 100644
--- a/pkg/scheduler/objects/queue.go
+++ b/pkg/scheduler/objects/queue.go
@@ -142,10 +142,10 @@ func NewConfiguredQueue(conf configs.QueueConfig, parent 
*Queue) (*Queue, error)
        } else {
                sq.UpdateQueueProperties()
        }
-       sq.queueEvents = newQueueEvents(sq, events.GetEventSystem())
+       sq.queueEvents = newQueueEvents(events.GetEventSystem())
        log.Log(log.SchedQueue).Info("configured queue added to scheduler",
                zap.String("queueName", sq.QueuePath))
-       sq.queueEvents.sendNewQueueEvent()
+       sq.queueEvents.sendNewQueueEvent(sq.QueuePath, sq.isManaged)
 
        return sq, nil
 }
@@ -200,10 +200,10 @@ func newDynamicQueueInternal(name string, leaf bool, 
parent *Queue) (*Queue, err
        }
 
        sq.UpdateQueueProperties()
-       sq.queueEvents = newQueueEvents(sq, events.GetEventSystem())
+       sq.queueEvents = newQueueEvents(events.GetEventSystem())
        log.Log(log.SchedQueue).Info("dynamic queue added to scheduler",
                zap.String("queueName", sq.QueuePath))
-       sq.queueEvents.sendNewQueueEvent()
+       sq.queueEvents.sendNewQueueEvent(sq.QueuePath, sq.isManaged)
 
        return sq, nil
 }
@@ -338,7 +338,7 @@ func (sq *Queue) applyConf(conf configs.QueueConfig) error {
        }
 
        if prevLeaf != sq.isLeaf && sq.queueEvents != nil {
-               sq.queueEvents.sendTypeChangedEvent()
+               sq.queueEvents.sendTypeChangedEvent(sq.QueuePath, sq.isLeaf)
        }
 
        if !sq.isLeaf {
@@ -384,7 +384,7 @@ func (sq *Queue) setResources(resource configs.Resources) 
error {
                        zap.Stringer("current", sq.maxResource),
                        zap.Stringer("new", maxResource))
                if !resources.Equals(sq.maxResource, maxResource) && 
sq.queueEvents != nil {
-                       sq.queueEvents.sendMaxResourceChangedEvent()
+                       
sq.queueEvents.sendMaxResourceChangedEvent(sq.QueuePath, maxResource)
                }
                sq.maxResource = maxResource
                sq.updateMaxResourceMetrics()
@@ -394,7 +394,7 @@ func (sq *Queue) setResources(resource configs.Resources) 
error {
                        zap.Stringer("current", sq.maxResource),
                        zap.Stringer("new", maxResource))
                if sq.queueEvents != nil {
-                       sq.queueEvents.sendMaxResourceChangedEvent()
+                       
sq.queueEvents.sendMaxResourceChangedEvent(sq.QueuePath, maxResource)
                }
                sq.maxResource = nil
                sq.updateMaxResourceMetrics()
@@ -410,7 +410,7 @@ func (sq *Queue) setResources(resource configs.Resources) 
error {
                        zap.Stringer("current", sq.guaranteedResource),
                        zap.Stringer("new", guaranteedResource))
                if !resources.Equals(sq.guaranteedResource, guaranteedResource) 
&& sq.queueEvents != nil {
-                       sq.queueEvents.sendGuaranteedResourceChangedEvent()
+                       
sq.queueEvents.sendGuaranteedResourceChangedEvent(sq.QueuePath, 
guaranteedResource)
                }
                sq.guaranteedResource = guaranteedResource
                sq.updateGuaranteedResourceMetrics()
@@ -420,7 +420,7 @@ func (sq *Queue) setResources(resource configs.Resources) 
error {
                        zap.Stringer("current", sq.guaranteedResource),
                        zap.Stringer("new", guaranteedResource))
                if sq.queueEvents != nil {
-                       sq.queueEvents.sendGuaranteedResourceChangedEvent()
+                       
sq.queueEvents.sendGuaranteedResourceChangedEvent(sq.QueuePath, 
guaranteedResource)
                }
                sq.guaranteedResource = nil
                sq.updateGuaranteedResourceMetrics()
@@ -728,7 +728,7 @@ func (sq *Queue) AddApplication(app *Application) {
        defer sq.Unlock()
        appID := app.ApplicationID
        sq.applications[appID] = app
-       sq.queueEvents.sendNewApplicationEvent(appID)
+       sq.queueEvents.sendNewApplicationEvent(sq.QueuePath, appID)
        // YUNIKORN-199: update the quota from the namespace
        // get the tag with the quota
        quota := app.GetTag(siCommon.AppTagNamespaceResourceQuota)
@@ -800,7 +800,7 @@ func (sq *Queue) RemoveApplication(app *Application) {
                        zap.String("applicationID", appID))
                return
        }
-       sq.queueEvents.sendRemoveApplicationEvent(appID)
+       sq.queueEvents.sendRemoveApplicationEvent(sq.QueuePath, appID)
        if appPending := app.GetPendingResource(); 
!resources.IsZero(appPending) {
                sq.decPendingResource(appPending)
        }
@@ -1002,7 +1002,7 @@ func (sq *Queue) RemoveQueue() bool {
        log.Log(log.SchedQueue).Info("removing queue", zap.String("queue", 
sq.QueuePath))
        // root is always managed and is the only queue with a nil parent: no 
need to guard
        sq.parent.removeChildQueue(sq.Name)
-       sq.queueEvents.sendRemoveQueueEvent()
+       sq.queueEvents.sendRemoveQueueEvent(sq.QueuePath, sq.isManaged)
        return true
 }
 
@@ -1323,7 +1323,7 @@ func (sq *Queue) SetMaxResource(max *resources.Resource) {
                        zap.Stringer("current", sq.maxResource),
                        zap.Stringer("new", max))
                if !resources.Equals(sq.maxResource, max) && sq.queueEvents != 
nil {
-                       sq.queueEvents.sendMaxResourceChangedEvent()
+                       
sq.queueEvents.sendMaxResourceChangedEvent(sq.QueuePath, sq.maxResource)
                }
                sq.maxResource = max.Clone()
                sq.updateMaxResourceMetrics()
@@ -1333,7 +1333,7 @@ func (sq *Queue) SetMaxResource(max *resources.Resource) {
                        zap.Stringer("current", sq.maxResource),
                        zap.Stringer("new", max))
                if sq.queueEvents != nil {
-                       sq.queueEvents.sendMaxResourceChangedEvent()
+                       
sq.queueEvents.sendMaxResourceChangedEvent(sq.QueuePath, sq.maxResource)
                }
                sq.maxResource = nil
                sq.updateMaxResourceMetrics()
@@ -1989,7 +1989,3 @@ func (sq *Queue) recalculatePriority() int32 {
        sq.currentPriority = curr
        return priorityValueByPolicy(sq.priorityPolicy, sq.priorityOffset, curr)
 }
-
-func (sq *Queue) SendRemoveQueueEvent() {
-       sq.queueEvents.sendRemoveQueueEvent()
-}
diff --git a/pkg/scheduler/objects/queue_events.go 
b/pkg/scheduler/objects/queue_events.go
index 35160484..7ef24d3f 100644
--- a/pkg/scheduler/objects/queue_events.go
+++ b/pkg/scheduler/objects/queue_events.go
@@ -20,93 +20,92 @@ package objects
 
 import (
        "github.com/apache/yunikorn-core/pkg/common"
+       "github.com/apache/yunikorn-core/pkg/common/resources"
        "github.com/apache/yunikorn-core/pkg/events"
        "github.com/apache/yunikorn-scheduler-interface/lib/go/si"
 )
 
 type queueEvents struct {
        eventSystem events.EventSystem
-       queue       *Queue
 }
 
-func (q *queueEvents) sendNewQueueEvent() {
+func (q *queueEvents) sendNewQueueEvent(queuePath string, managed bool) {
        if !q.eventSystem.IsEventTrackingEnabled() {
                return
        }
        detail := si.EventRecord_QUEUE_DYNAMIC
-       if q.queue.IsManaged() {
+       if managed {
                detail = si.EventRecord_DETAILS_NONE
        }
-       event := events.CreateQueueEventRecord(q.queue.QueuePath, common.Empty, 
common.Empty, si.EventRecord_ADD,
+       event := events.CreateQueueEventRecord(queuePath, common.Empty, 
common.Empty, si.EventRecord_ADD,
                detail, nil)
        q.eventSystem.AddEvent(event)
 }
 
-func (q *queueEvents) sendNewApplicationEvent(appID string) {
+func (q *queueEvents) sendNewApplicationEvent(queuePath, appID string) {
        if !q.eventSystem.IsEventTrackingEnabled() {
                return
        }
-       event := events.CreateQueueEventRecord(q.queue.QueuePath, common.Empty, 
appID, si.EventRecord_ADD,
+       event := events.CreateQueueEventRecord(queuePath, common.Empty, appID, 
si.EventRecord_ADD,
                si.EventRecord_QUEUE_APP, nil)
        q.eventSystem.AddEvent(event)
 }
 
-func (q *queueEvents) sendRemoveQueueEvent() {
+func (q *queueEvents) sendRemoveQueueEvent(queuePath string, managed bool) {
        if !q.eventSystem.IsEventTrackingEnabled() {
                return
        }
        detail := si.EventRecord_QUEUE_DYNAMIC
-       if q.queue.IsManaged() {
+       if managed {
                detail = si.EventRecord_DETAILS_NONE
        }
-       event := events.CreateQueueEventRecord(q.queue.QueuePath, common.Empty, 
common.Empty, si.EventRecord_REMOVE,
+       event := events.CreateQueueEventRecord(queuePath, common.Empty, 
common.Empty, si.EventRecord_REMOVE,
                detail, nil)
        q.eventSystem.AddEvent(event)
 }
 
-func (q *queueEvents) sendRemoveApplicationEvent(appID string) {
+func (q *queueEvents) sendRemoveApplicationEvent(queuePath, appID string) {
        if !q.eventSystem.IsEventTrackingEnabled() {
                return
        }
-       event := events.CreateQueueEventRecord(q.queue.QueuePath, common.Empty, 
appID, si.EventRecord_REMOVE,
+       event := events.CreateQueueEventRecord(queuePath, common.Empty, appID, 
si.EventRecord_REMOVE,
                si.EventRecord_QUEUE_APP, nil)
        q.eventSystem.AddEvent(event)
 }
 
-func (q *queueEvents) sendMaxResourceChangedEvent() {
+func (q *queueEvents) sendMaxResourceChangedEvent(queuePath string, 
maxResource *resources.Resource) {
        if !q.eventSystem.IsEventTrackingEnabled() {
                return
        }
-       event := events.CreateQueueEventRecord(q.queue.QueuePath, common.Empty, 
common.Empty, si.EventRecord_SET,
-               si.EventRecord_QUEUE_MAX, q.queue.maxResource)
+       event := events.CreateQueueEventRecord(queuePath, common.Empty, 
common.Empty, si.EventRecord_SET,
+               si.EventRecord_QUEUE_MAX, maxResource)
        q.eventSystem.AddEvent(event)
 }
 
-func (q *queueEvents) sendGuaranteedResourceChangedEvent() {
+func (q *queueEvents) sendGuaranteedResourceChangedEvent(queuePath string, 
guaranteed *resources.Resource) {
        if !q.eventSystem.IsEventTrackingEnabled() {
                return
        }
-       event := events.CreateQueueEventRecord(q.queue.QueuePath, common.Empty, 
common.Empty, si.EventRecord_SET,
-               si.EventRecord_QUEUE_GUARANTEED, q.queue.guaranteedResource)
+       event := events.CreateQueueEventRecord(queuePath, common.Empty, 
common.Empty, si.EventRecord_SET,
+               si.EventRecord_QUEUE_GUARANTEED, guaranteed)
        q.eventSystem.AddEvent(event)
 }
 
-func (q *queueEvents) sendTypeChangedEvent() {
+func (q *queueEvents) sendTypeChangedEvent(queuePath string, isLeaf bool) {
        if !q.eventSystem.IsEventTrackingEnabled() {
                return
        }
        message := "leaf queue: false"
-       if q.queue.isLeaf {
+       if isLeaf {
                message = "leaf queue: true"
        }
-       event := events.CreateQueueEventRecord(q.queue.QueuePath, message, 
common.Empty, si.EventRecord_SET,
+       event := events.CreateQueueEventRecord(queuePath, message, 
common.Empty, si.EventRecord_SET,
                si.EventRecord_QUEUE_TYPE, nil)
        q.eventSystem.AddEvent(event)
 }
 
-func newQueueEvents(queue *Queue, evt events.EventSystem) *queueEvents {
+func newQueueEvents(evt events.EventSystem) *queueEvents {
        return &queueEvents{
                eventSystem: evt,
-               queue:       queue,
        }
 }
diff --git a/pkg/scheduler/objects/queue_events_test.go 
b/pkg/scheduler/objects/queue_events_test.go
index d36bf36d..07d781fb 100644
--- a/pkg/scheduler/objects/queue_events_test.go
+++ b/pkg/scheduler/objects/queue_events_test.go
@@ -39,13 +39,13 @@ func TestSendNewQueueEvent(t *testing.T) {
                isManaged: true,
        }
        eventSystem := mock.NewEventSystemDisabled()
-       nq := newQueueEvents(queue, eventSystem)
-       nq.sendNewQueueEvent()
+       nq := newQueueEvents(eventSystem)
+       nq.sendNewQueueEvent(queue.QueuePath, false)
        assert.Equal(t, 0, len(eventSystem.Events), "unexpected event")
 
        eventSystem = mock.NewEventSystem()
-       nq = newQueueEvents(queue, eventSystem)
-       nq.sendNewQueueEvent()
+       nq = newQueueEvents(eventSystem)
+       nq.sendNewQueueEvent(queue.QueuePath, true)
        assert.Equal(t, 1, len(eventSystem.Events), "event was not generated")
        event := eventSystem.Events[0]
        assert.Equal(t, si.EventRecord_QUEUE, event.Type)
@@ -56,28 +56,21 @@ func TestSendNewQueueEvent(t *testing.T) {
        assert.Equal(t, si.EventRecord_DETAILS_NONE, event.EventChangeDetail)
        assert.Equal(t, 0, len(event.Resource.Resources))
        eventSystem = mock.NewEventSystem()
-       nq = newQueueEvents(&Queue{
-               QueuePath: testQueuePath,
-               isManaged: false,
-       }, eventSystem)
-       nq.sendNewQueueEvent()
+       nq = newQueueEvents(eventSystem)
+       nq.sendNewQueueEvent(queue.QueuePath, false)
        event = eventSystem.Events[0]
        assert.Equal(t, si.EventRecord_QUEUE_DYNAMIC, event.EventChangeDetail)
 }
 
 func TestSendRemoveQueueEvent(t *testing.T) {
-       queue := &Queue{
-               QueuePath: testQueuePath,
-               isManaged: true,
-       }
        eventSystem := mock.NewEventSystemDisabled()
-       nq := newQueueEvents(queue, eventSystem)
-       nq.sendRemoveQueueEvent()
+       nq := newQueueEvents(eventSystem)
+       nq.sendRemoveQueueEvent(testQueuePath, true)
        assert.Equal(t, 0, len(eventSystem.Events), "unexpected event")
 
        eventSystem = mock.NewEventSystem()
-       nq = newQueueEvents(queue, eventSystem)
-       nq.sendRemoveQueueEvent()
+       nq = newQueueEvents(eventSystem)
+       nq.sendRemoveQueueEvent(testQueuePath, true)
        assert.Equal(t, 1, len(eventSystem.Events), "event was not generated")
        event := eventSystem.Events[0]
        assert.Equal(t, si.EventRecord_QUEUE, event.Type)
@@ -88,27 +81,21 @@ func TestSendRemoveQueueEvent(t *testing.T) {
        assert.Equal(t, si.EventRecord_DETAILS_NONE, event.EventChangeDetail)
        assert.Equal(t, 0, len(event.Resource.Resources))
        eventSystem = mock.NewEventSystem()
-       nq = newQueueEvents(&Queue{
-               QueuePath: testQueuePath,
-               isManaged: false,
-       }, eventSystem)
-       nq.sendRemoveQueueEvent()
+       nq = newQueueEvents(eventSystem)
+       nq.sendRemoveQueueEvent(testQueuePath, false)
        event = eventSystem.Events[0]
        assert.Equal(t, si.EventRecord_QUEUE_DYNAMIC, event.EventChangeDetail)
 }
 
 func TestNewApplicationEvent(t *testing.T) {
-       queue := &Queue{
-               QueuePath: testQueuePath,
-       }
        eventSystem := mock.NewEventSystemDisabled()
-       nq := newQueueEvents(queue, eventSystem)
-       nq.sendNewApplicationEvent(appID0)
+       nq := newQueueEvents(eventSystem)
+       nq.sendNewApplicationEvent(testQueuePath, appID0)
        assert.Equal(t, 0, len(eventSystem.Events), "unexpected event")
 
        eventSystem = mock.NewEventSystem()
-       nq = newQueueEvents(queue, eventSystem)
-       nq.sendNewApplicationEvent(appID0)
+       nq = newQueueEvents(eventSystem)
+       nq.sendNewApplicationEvent(testQueuePath, appID0)
        assert.Equal(t, 1, len(eventSystem.Events), "event was not generated")
        event := eventSystem.Events[0]
        assert.Equal(t, si.EventRecord_QUEUE, event.Type)
@@ -121,17 +108,14 @@ func TestNewApplicationEvent(t *testing.T) {
 }
 
 func TestRemoveApplicationEvent(t *testing.T) {
-       queue := &Queue{
-               QueuePath: testQueuePath,
-       }
        eventSystem := mock.NewEventSystemDisabled()
-       nq := newQueueEvents(queue, eventSystem)
-       nq.sendRemoveApplicationEvent(appID0)
+       nq := newQueueEvents(eventSystem)
+       nq.sendRemoveApplicationEvent(testQueuePath, appID0)
        assert.Equal(t, 0, len(eventSystem.Events), "unexpected event")
 
        eventSystem = mock.NewEventSystem()
-       nq = newQueueEvents(queue, eventSystem)
-       nq.sendRemoveApplicationEvent(appID0)
+       nq = newQueueEvents(eventSystem)
+       nq.sendRemoveApplicationEvent(testQueuePath, appID0)
        assert.Equal(t, 1, len(eventSystem.Events), "event was not generated")
        event := eventSystem.Events[0]
        assert.Equal(t, si.EventRecord_QUEUE, event.Type)
@@ -144,17 +128,14 @@ func TestRemoveApplicationEvent(t *testing.T) {
 }
 
 func TestTypeChangedEvent(t *testing.T) {
-       queue := &Queue{
-               QueuePath: testQueuePath,
-       }
        eventSystem := mock.NewEventSystemDisabled()
-       nq := newQueueEvents(queue, eventSystem)
-       nq.sendTypeChangedEvent()
+       nq := newQueueEvents(eventSystem)
+       nq.sendTypeChangedEvent(testQueuePath, true)
        assert.Equal(t, 0, len(eventSystem.Events), "unexpected event")
 
        eventSystem = mock.NewEventSystem()
-       nq = newQueueEvents(queue, eventSystem)
-       nq.sendTypeChangedEvent()
+       nq = newQueueEvents(eventSystem)
+       nq.sendTypeChangedEvent(testQueuePath, false)
        assert.Equal(t, 1, len(eventSystem.Events), "event was not generated")
        event := eventSystem.Events[0]
        assert.Equal(t, si.EventRecord_QUEUE, event.Type)
@@ -168,18 +149,14 @@ func TestTypeChangedEvent(t *testing.T) {
 
 func TestSendMaxResourceChangedEvent(t *testing.T) {
        maxRes := 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 1})
-       queue := &Queue{
-               QueuePath:   testQueuePath,
-               maxResource: maxRes,
-       }
        eventSystem := mock.NewEventSystemDisabled()
-       nq := newQueueEvents(queue, eventSystem)
-       nq.sendMaxResourceChangedEvent()
+       nq := newQueueEvents(eventSystem)
+       nq.sendMaxResourceChangedEvent(testQueuePath, maxRes)
        assert.Equal(t, 0, len(eventSystem.Events), "unexpected event")
 
        eventSystem = mock.NewEventSystem()
-       nq = newQueueEvents(queue, eventSystem)
-       nq.sendMaxResourceChangedEvent()
+       nq = newQueueEvents(eventSystem)
+       nq.sendMaxResourceChangedEvent(testQueuePath, maxRes)
        assert.Equal(t, 1, len(eventSystem.Events), "event was not generated")
        event := eventSystem.Events[0]
        assert.Equal(t, si.EventRecord_QUEUE, event.Type)
@@ -195,18 +172,14 @@ func TestSendMaxResourceChangedEvent(t *testing.T) {
 
 func TestSendGuaranteedResourceChangedEvent(t *testing.T) {
        guaranteed := 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 1})
-       queue := &Queue{
-               QueuePath:          testQueuePath,
-               guaranteedResource: guaranteed,
-       }
        eventSystem := mock.NewEventSystemDisabled()
-       nq := newQueueEvents(queue, eventSystem)
-       nq.sendGuaranteedResourceChangedEvent()
+       nq := newQueueEvents(eventSystem)
+       nq.sendGuaranteedResourceChangedEvent(testQueuePath, guaranteed)
        assert.Equal(t, 0, len(eventSystem.Events), "unexpected event")
 
        eventSystem = mock.NewEventSystem()
-       nq = newQueueEvents(queue, eventSystem)
-       nq.sendGuaranteedResourceChangedEvent()
+       nq = newQueueEvents(eventSystem)
+       nq.sendGuaranteedResourceChangedEvent(testQueuePath, guaranteed)
        assert.Equal(t, 1, len(eventSystem.Events), "event was not generated")
        event := eventSystem.Events[0]
        assert.Equal(t, si.EventRecord_QUEUE, event.Type)


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@yunikorn.apache.org
For additional commands, e-mail: issues-h...@yunikorn.apache.org

Reply via email to