This is an automated email from the ASF dual-hosted git repository.

pbacsko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git


The following commit(s) were added to refs/heads/master by this push:
     new 72540e2b [YUNIKORN-2552] Recursive locking when sending remove queue 
event (#841)
72540e2b is described below

commit 72540e2b277ffc3b00ec19b21a69e31da9fbbf09
Author: Peter Bacsko <[email protected]>
AuthorDate: Mon Apr 15 13:18:44 2024 +0200

    [YUNIKORN-2552] Recursive locking when sending remove queue event (#841)
    
    Closes: #841
    
    Signed-off-by: Peter Bacsko <[email protected]>
---
 pkg/scheduler/objects/queue.go             | 32 +++++------
 pkg/scheduler/objects/queue_events.go      | 43 +++++++-------
 pkg/scheduler/objects/queue_events_test.go | 91 +++++++++++-------------------
 3 files changed, 67 insertions(+), 99 deletions(-)

diff --git a/pkg/scheduler/objects/queue.go b/pkg/scheduler/objects/queue.go
index 4999fd92..3de469a4 100644
--- a/pkg/scheduler/objects/queue.go
+++ b/pkg/scheduler/objects/queue.go
@@ -142,10 +142,10 @@ func NewConfiguredQueue(conf configs.QueueConfig, parent 
*Queue) (*Queue, error)
        } else {
                sq.UpdateQueueProperties()
        }
-       sq.queueEvents = newQueueEvents(sq, events.GetEventSystem())
+       sq.queueEvents = newQueueEvents(events.GetEventSystem())
        log.Log(log.SchedQueue).Info("configured queue added to scheduler",
                zap.String("queueName", sq.QueuePath))
-       sq.queueEvents.sendNewQueueEvent()
+       sq.queueEvents.sendNewQueueEvent(sq.QueuePath, sq.isManaged)
 
        return sq, nil
 }
@@ -200,10 +200,10 @@ func newDynamicQueueInternal(name string, leaf bool, 
parent *Queue) (*Queue, err
        }
 
        sq.UpdateQueueProperties()
-       sq.queueEvents = newQueueEvents(sq, events.GetEventSystem())
+       sq.queueEvents = newQueueEvents(events.GetEventSystem())
        log.Log(log.SchedQueue).Info("dynamic queue added to scheduler",
                zap.String("queueName", sq.QueuePath))
-       sq.queueEvents.sendNewQueueEvent()
+       sq.queueEvents.sendNewQueueEvent(sq.QueuePath, sq.isManaged)
 
        return sq, nil
 }
@@ -346,7 +346,7 @@ func (sq *Queue) applyConf(conf configs.QueueConfig) error {
        }
 
        if prevLeaf != sq.isLeaf && sq.queueEvents != nil {
-               sq.queueEvents.sendTypeChangedEvent()
+               sq.queueEvents.sendTypeChangedEvent(sq.QueuePath, sq.isLeaf)
        }
 
        if !sq.isLeaf {
@@ -392,7 +392,7 @@ func (sq *Queue) setResources(resource configs.Resources) 
error {
                        zap.Stringer("current", sq.maxResource),
                        zap.Stringer("new", maxResource))
                if !resources.Equals(sq.maxResource, maxResource) && 
sq.queueEvents != nil {
-                       sq.queueEvents.sendMaxResourceChangedEvent()
+                       
sq.queueEvents.sendMaxResourceChangedEvent(sq.QueuePath, maxResource)
                }
                sq.maxResource = maxResource
                sq.updateMaxResourceMetrics()
@@ -402,7 +402,7 @@ func (sq *Queue) setResources(resource configs.Resources) 
error {
                        zap.Stringer("current", sq.maxResource),
                        zap.Stringer("new", maxResource))
                if sq.queueEvents != nil {
-                       sq.queueEvents.sendMaxResourceChangedEvent()
+                       
sq.queueEvents.sendMaxResourceChangedEvent(sq.QueuePath, maxResource)
                }
                sq.maxResource = nil
                sq.updateMaxResourceMetrics()
@@ -418,7 +418,7 @@ func (sq *Queue) setResources(resource configs.Resources) 
error {
                        zap.Stringer("current", sq.guaranteedResource),
                        zap.Stringer("new", guaranteedResource))
                if !resources.Equals(sq.guaranteedResource, guaranteedResource) 
&& sq.queueEvents != nil {
-                       sq.queueEvents.sendGuaranteedResourceChangedEvent()
+                       
sq.queueEvents.sendGuaranteedResourceChangedEvent(sq.QueuePath, 
guaranteedResource)
                }
                sq.guaranteedResource = guaranteedResource
                sq.updateGuaranteedResourceMetrics()
@@ -428,7 +428,7 @@ func (sq *Queue) setResources(resource configs.Resources) 
error {
                        zap.Stringer("current", sq.guaranteedResource),
                        zap.Stringer("new", guaranteedResource))
                if sq.queueEvents != nil {
-                       sq.queueEvents.sendGuaranteedResourceChangedEvent()
+                       
sq.queueEvents.sendGuaranteedResourceChangedEvent(sq.QueuePath, 
guaranteedResource)
                }
                sq.guaranteedResource = nil
                sq.updateGuaranteedResourceMetrics()
@@ -736,7 +736,7 @@ func (sq *Queue) AddApplication(app *Application) {
        defer sq.Unlock()
        appID := app.ApplicationID
        sq.applications[appID] = app
-       sq.queueEvents.sendNewApplicationEvent(appID)
+       sq.queueEvents.sendNewApplicationEvent(sq.QueuePath, appID)
        // YUNIKORN-199: update the quota from the namespace
        // get the tag with the quota
        quota := app.GetTag(siCommon.AppTagNamespaceResourceQuota)
@@ -808,7 +808,7 @@ func (sq *Queue) RemoveApplication(app *Application) {
                        zap.String("applicationID", appID))
                return
        }
-       sq.queueEvents.sendRemoveApplicationEvent(appID)
+       sq.queueEvents.sendRemoveApplicationEvent(sq.QueuePath, appID)
        if appPending := app.GetPendingResource(); 
!resources.IsZero(appPending) {
                sq.decPendingResource(appPending)
        }
@@ -1010,7 +1010,7 @@ func (sq *Queue) RemoveQueue() bool {
        log.Log(log.SchedQueue).Info("removing queue", zap.String("queue", 
sq.QueuePath))
        // root is always managed and is the only queue with a nil parent: no 
need to guard
        sq.parent.removeChildQueue(sq.Name)
-       sq.queueEvents.sendRemoveQueueEvent()
+       sq.queueEvents.sendRemoveQueueEvent(sq.QueuePath, sq.isManaged)
        return true
 }
 
@@ -1322,7 +1322,7 @@ func (sq *Queue) SetMaxResource(max *resources.Resource) {
                        zap.Stringer("current", sq.maxResource),
                        zap.Stringer("new", max))
                if !resources.Equals(sq.maxResource, max) && sq.queueEvents != 
nil {
-                       sq.queueEvents.sendMaxResourceChangedEvent()
+                       
sq.queueEvents.sendMaxResourceChangedEvent(sq.QueuePath, sq.maxResource)
                }
                sq.maxResource = max.Clone()
                sq.updateMaxResourceMetrics()
@@ -1332,7 +1332,7 @@ func (sq *Queue) SetMaxResource(max *resources.Resource) {
                        zap.Stringer("current", sq.maxResource),
                        zap.Stringer("new", max))
                if sq.queueEvents != nil {
-                       sq.queueEvents.sendMaxResourceChangedEvent()
+                       
sq.queueEvents.sendMaxResourceChangedEvent(sq.QueuePath, sq.maxResource)
                }
                sq.maxResource = nil
                sq.updateMaxResourceMetrics()
@@ -1987,7 +1987,3 @@ func (sq *Queue) recalculatePriority() int32 {
        sq.currentPriority = curr
        return priorityValueByPolicy(sq.priorityPolicy, sq.priorityOffset, curr)
 }
-
-func (sq *Queue) SendRemoveQueueEvent() {
-       sq.queueEvents.sendRemoveQueueEvent()
-}
diff --git a/pkg/scheduler/objects/queue_events.go 
b/pkg/scheduler/objects/queue_events.go
index 35160484..7ef24d3f 100644
--- a/pkg/scheduler/objects/queue_events.go
+++ b/pkg/scheduler/objects/queue_events.go
@@ -20,93 +20,92 @@ package objects
 
 import (
        "github.com/apache/yunikorn-core/pkg/common"
+       "github.com/apache/yunikorn-core/pkg/common/resources"
        "github.com/apache/yunikorn-core/pkg/events"
        "github.com/apache/yunikorn-scheduler-interface/lib/go/si"
 )
 
 type queueEvents struct {
        eventSystem events.EventSystem
-       queue       *Queue
 }
 
-func (q *queueEvents) sendNewQueueEvent() {
+func (q *queueEvents) sendNewQueueEvent(queuePath string, managed bool) {
        if !q.eventSystem.IsEventTrackingEnabled() {
                return
        }
        detail := si.EventRecord_QUEUE_DYNAMIC
-       if q.queue.IsManaged() {
+       if managed {
                detail = si.EventRecord_DETAILS_NONE
        }
-       event := events.CreateQueueEventRecord(q.queue.QueuePath, common.Empty, 
common.Empty, si.EventRecord_ADD,
+       event := events.CreateQueueEventRecord(queuePath, common.Empty, 
common.Empty, si.EventRecord_ADD,
                detail, nil)
        q.eventSystem.AddEvent(event)
 }
 
-func (q *queueEvents) sendNewApplicationEvent(appID string) {
+func (q *queueEvents) sendNewApplicationEvent(queuePath, appID string) {
        if !q.eventSystem.IsEventTrackingEnabled() {
                return
        }
-       event := events.CreateQueueEventRecord(q.queue.QueuePath, common.Empty, 
appID, si.EventRecord_ADD,
+       event := events.CreateQueueEventRecord(queuePath, common.Empty, appID, 
si.EventRecord_ADD,
                si.EventRecord_QUEUE_APP, nil)
        q.eventSystem.AddEvent(event)
 }
 
-func (q *queueEvents) sendRemoveQueueEvent() {
+func (q *queueEvents) sendRemoveQueueEvent(queuePath string, managed bool) {
        if !q.eventSystem.IsEventTrackingEnabled() {
                return
        }
        detail := si.EventRecord_QUEUE_DYNAMIC
-       if q.queue.IsManaged() {
+       if managed {
                detail = si.EventRecord_DETAILS_NONE
        }
-       event := events.CreateQueueEventRecord(q.queue.QueuePath, common.Empty, 
common.Empty, si.EventRecord_REMOVE,
+       event := events.CreateQueueEventRecord(queuePath, common.Empty, 
common.Empty, si.EventRecord_REMOVE,
                detail, nil)
        q.eventSystem.AddEvent(event)
 }
 
-func (q *queueEvents) sendRemoveApplicationEvent(appID string) {
+func (q *queueEvents) sendRemoveApplicationEvent(queuePath, appID string) {
        if !q.eventSystem.IsEventTrackingEnabled() {
                return
        }
-       event := events.CreateQueueEventRecord(q.queue.QueuePath, common.Empty, 
appID, si.EventRecord_REMOVE,
+       event := events.CreateQueueEventRecord(queuePath, common.Empty, appID, 
si.EventRecord_REMOVE,
                si.EventRecord_QUEUE_APP, nil)
        q.eventSystem.AddEvent(event)
 }
 
-func (q *queueEvents) sendMaxResourceChangedEvent() {
+func (q *queueEvents) sendMaxResourceChangedEvent(queuePath string, 
maxResource *resources.Resource) {
        if !q.eventSystem.IsEventTrackingEnabled() {
                return
        }
-       event := events.CreateQueueEventRecord(q.queue.QueuePath, common.Empty, 
common.Empty, si.EventRecord_SET,
-               si.EventRecord_QUEUE_MAX, q.queue.maxResource)
+       event := events.CreateQueueEventRecord(queuePath, common.Empty, 
common.Empty, si.EventRecord_SET,
+               si.EventRecord_QUEUE_MAX, maxResource)
        q.eventSystem.AddEvent(event)
 }
 
-func (q *queueEvents) sendGuaranteedResourceChangedEvent() {
+func (q *queueEvents) sendGuaranteedResourceChangedEvent(queuePath string, 
guaranteed *resources.Resource) {
        if !q.eventSystem.IsEventTrackingEnabled() {
                return
        }
-       event := events.CreateQueueEventRecord(q.queue.QueuePath, common.Empty, 
common.Empty, si.EventRecord_SET,
-               si.EventRecord_QUEUE_GUARANTEED, q.queue.guaranteedResource)
+       event := events.CreateQueueEventRecord(queuePath, common.Empty, 
common.Empty, si.EventRecord_SET,
+               si.EventRecord_QUEUE_GUARANTEED, guaranteed)
        q.eventSystem.AddEvent(event)
 }
 
-func (q *queueEvents) sendTypeChangedEvent() {
+func (q *queueEvents) sendTypeChangedEvent(queuePath string, isLeaf bool) {
        if !q.eventSystem.IsEventTrackingEnabled() {
                return
        }
        message := "leaf queue: false"
-       if q.queue.isLeaf {
+       if isLeaf {
                message = "leaf queue: true"
        }
-       event := events.CreateQueueEventRecord(q.queue.QueuePath, message, 
common.Empty, si.EventRecord_SET,
+       event := events.CreateQueueEventRecord(queuePath, message, 
common.Empty, si.EventRecord_SET,
                si.EventRecord_QUEUE_TYPE, nil)
        q.eventSystem.AddEvent(event)
 }
 
-func newQueueEvents(queue *Queue, evt events.EventSystem) *queueEvents {
+func newQueueEvents(evt events.EventSystem) *queueEvents {
        return &queueEvents{
                eventSystem: evt,
-               queue:       queue,
        }
 }
diff --git a/pkg/scheduler/objects/queue_events_test.go 
b/pkg/scheduler/objects/queue_events_test.go
index d36bf36d..07d781fb 100644
--- a/pkg/scheduler/objects/queue_events_test.go
+++ b/pkg/scheduler/objects/queue_events_test.go
@@ -39,13 +39,13 @@ func TestSendNewQueueEvent(t *testing.T) {
                isManaged: true,
        }
        eventSystem := mock.NewEventSystemDisabled()
-       nq := newQueueEvents(queue, eventSystem)
-       nq.sendNewQueueEvent()
+       nq := newQueueEvents(eventSystem)
+       nq.sendNewQueueEvent(queue.QueuePath, false)
        assert.Equal(t, 0, len(eventSystem.Events), "unexpected event")
 
        eventSystem = mock.NewEventSystem()
-       nq = newQueueEvents(queue, eventSystem)
-       nq.sendNewQueueEvent()
+       nq = newQueueEvents(eventSystem)
+       nq.sendNewQueueEvent(queue.QueuePath, true)
        assert.Equal(t, 1, len(eventSystem.Events), "event was not generated")
        event := eventSystem.Events[0]
        assert.Equal(t, si.EventRecord_QUEUE, event.Type)
@@ -56,28 +56,21 @@ func TestSendNewQueueEvent(t *testing.T) {
        assert.Equal(t, si.EventRecord_DETAILS_NONE, event.EventChangeDetail)
        assert.Equal(t, 0, len(event.Resource.Resources))
        eventSystem = mock.NewEventSystem()
-       nq = newQueueEvents(&Queue{
-               QueuePath: testQueuePath,
-               isManaged: false,
-       }, eventSystem)
-       nq.sendNewQueueEvent()
+       nq = newQueueEvents(eventSystem)
+       nq.sendNewQueueEvent(queue.QueuePath, false)
        event = eventSystem.Events[0]
        assert.Equal(t, si.EventRecord_QUEUE_DYNAMIC, event.EventChangeDetail)
 }
 
 func TestSendRemoveQueueEvent(t *testing.T) {
-       queue := &Queue{
-               QueuePath: testQueuePath,
-               isManaged: true,
-       }
        eventSystem := mock.NewEventSystemDisabled()
-       nq := newQueueEvents(queue, eventSystem)
-       nq.sendRemoveQueueEvent()
+       nq := newQueueEvents(eventSystem)
+       nq.sendRemoveQueueEvent(testQueuePath, true)
        assert.Equal(t, 0, len(eventSystem.Events), "unexpected event")
 
        eventSystem = mock.NewEventSystem()
-       nq = newQueueEvents(queue, eventSystem)
-       nq.sendRemoveQueueEvent()
+       nq = newQueueEvents(eventSystem)
+       nq.sendRemoveQueueEvent(testQueuePath, true)
        assert.Equal(t, 1, len(eventSystem.Events), "event was not generated")
        event := eventSystem.Events[0]
        assert.Equal(t, si.EventRecord_QUEUE, event.Type)
@@ -88,27 +81,21 @@ func TestSendRemoveQueueEvent(t *testing.T) {
        assert.Equal(t, si.EventRecord_DETAILS_NONE, event.EventChangeDetail)
        assert.Equal(t, 0, len(event.Resource.Resources))
        eventSystem = mock.NewEventSystem()
-       nq = newQueueEvents(&Queue{
-               QueuePath: testQueuePath,
-               isManaged: false,
-       }, eventSystem)
-       nq.sendRemoveQueueEvent()
+       nq = newQueueEvents(eventSystem)
+       nq.sendRemoveQueueEvent(testQueuePath, false)
        event = eventSystem.Events[0]
        assert.Equal(t, si.EventRecord_QUEUE_DYNAMIC, event.EventChangeDetail)
 }
 
 func TestNewApplicationEvent(t *testing.T) {
-       queue := &Queue{
-               QueuePath: testQueuePath,
-       }
        eventSystem := mock.NewEventSystemDisabled()
-       nq := newQueueEvents(queue, eventSystem)
-       nq.sendNewApplicationEvent(appID0)
+       nq := newQueueEvents(eventSystem)
+       nq.sendNewApplicationEvent(testQueuePath, appID0)
        assert.Equal(t, 0, len(eventSystem.Events), "unexpected event")
 
        eventSystem = mock.NewEventSystem()
-       nq = newQueueEvents(queue, eventSystem)
-       nq.sendNewApplicationEvent(appID0)
+       nq = newQueueEvents(eventSystem)
+       nq.sendNewApplicationEvent(testQueuePath, appID0)
        assert.Equal(t, 1, len(eventSystem.Events), "event was not generated")
        event := eventSystem.Events[0]
        assert.Equal(t, si.EventRecord_QUEUE, event.Type)
@@ -121,17 +108,14 @@ func TestNewApplicationEvent(t *testing.T) {
 }
 
 func TestRemoveApplicationEvent(t *testing.T) {
-       queue := &Queue{
-               QueuePath: testQueuePath,
-       }
        eventSystem := mock.NewEventSystemDisabled()
-       nq := newQueueEvents(queue, eventSystem)
-       nq.sendRemoveApplicationEvent(appID0)
+       nq := newQueueEvents(eventSystem)
+       nq.sendRemoveApplicationEvent(testQueuePath, appID0)
        assert.Equal(t, 0, len(eventSystem.Events), "unexpected event")
 
        eventSystem = mock.NewEventSystem()
-       nq = newQueueEvents(queue, eventSystem)
-       nq.sendRemoveApplicationEvent(appID0)
+       nq = newQueueEvents(eventSystem)
+       nq.sendRemoveApplicationEvent(testQueuePath, appID0)
        assert.Equal(t, 1, len(eventSystem.Events), "event was not generated")
        event := eventSystem.Events[0]
        assert.Equal(t, si.EventRecord_QUEUE, event.Type)
@@ -144,17 +128,14 @@ func TestRemoveApplicationEvent(t *testing.T) {
 }
 
 func TestTypeChangedEvent(t *testing.T) {
-       queue := &Queue{
-               QueuePath: testQueuePath,
-       }
        eventSystem := mock.NewEventSystemDisabled()
-       nq := newQueueEvents(queue, eventSystem)
-       nq.sendTypeChangedEvent()
+       nq := newQueueEvents(eventSystem)
+       nq.sendTypeChangedEvent(testQueuePath, true)
        assert.Equal(t, 0, len(eventSystem.Events), "unexpected event")
 
        eventSystem = mock.NewEventSystem()
-       nq = newQueueEvents(queue, eventSystem)
-       nq.sendTypeChangedEvent()
+       nq = newQueueEvents(eventSystem)
+       nq.sendTypeChangedEvent(testQueuePath, false)
        assert.Equal(t, 1, len(eventSystem.Events), "event was not generated")
        event := eventSystem.Events[0]
        assert.Equal(t, si.EventRecord_QUEUE, event.Type)
@@ -168,18 +149,14 @@ func TestTypeChangedEvent(t *testing.T) {
 
 func TestSendMaxResourceChangedEvent(t *testing.T) {
        maxRes := 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 1})
-       queue := &Queue{
-               QueuePath:   testQueuePath,
-               maxResource: maxRes,
-       }
        eventSystem := mock.NewEventSystemDisabled()
-       nq := newQueueEvents(queue, eventSystem)
-       nq.sendMaxResourceChangedEvent()
+       nq := newQueueEvents(eventSystem)
+       nq.sendMaxResourceChangedEvent(testQueuePath, maxRes)
        assert.Equal(t, 0, len(eventSystem.Events), "unexpected event")
 
        eventSystem = mock.NewEventSystem()
-       nq = newQueueEvents(queue, eventSystem)
-       nq.sendMaxResourceChangedEvent()
+       nq = newQueueEvents(eventSystem)
+       nq.sendMaxResourceChangedEvent(testQueuePath, maxRes)
        assert.Equal(t, 1, len(eventSystem.Events), "event was not generated")
        event := eventSystem.Events[0]
        assert.Equal(t, si.EventRecord_QUEUE, event.Type)
@@ -195,18 +172,14 @@ func TestSendMaxResourceChangedEvent(t *testing.T) {
 
 func TestSendGuaranteedResourceChangedEvent(t *testing.T) {
        guaranteed := 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 1})
-       queue := &Queue{
-               QueuePath:          testQueuePath,
-               guaranteedResource: guaranteed,
-       }
        eventSystem := mock.NewEventSystemDisabled()
-       nq := newQueueEvents(queue, eventSystem)
-       nq.sendGuaranteedResourceChangedEvent()
+       nq := newQueueEvents(eventSystem)
+       nq.sendGuaranteedResourceChangedEvent(testQueuePath, guaranteed)
        assert.Equal(t, 0, len(eventSystem.Events), "unexpected event")
 
        eventSystem = mock.NewEventSystem()
-       nq = newQueueEvents(queue, eventSystem)
-       nq.sendGuaranteedResourceChangedEvent()
+       nq = newQueueEvents(eventSystem)
+       nq.sendGuaranteedResourceChangedEvent(testQueuePath, guaranteed)
        assert.Equal(t, 1, len(eventSystem.Events), "event was not generated")
        event := eventSystem.Events[0]
        assert.Equal(t, si.EventRecord_QUEUE, event.Type)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to