This is an automated email from the ASF dual-hosted git repository.

pbacsko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git


The following commit(s) were added to refs/heads/master by this push:
     new 16d2c9cd [YUNIKORN-2548] Potential deadlock during concurrent 
bottom-up/top-down queue traversal (#839)
16d2c9cd is described below

commit 16d2c9cd79e73be51d23f0ce5c5d9f153be140bc
Author: Peter Bacsko <[email protected]>
AuthorDate: Wed Apr 10 21:21:37 2024 +0200

    [YUNIKORN-2548] Potential deadlock during concurrent bottom-up/top-down 
queue traversal (#839)
    
    Closes: #839
    
    Signed-off-by: Peter Bacsko <[email protected]>
---
 pkg/scheduler/objects/queue.go | 62 +++++++++++++++++++++++++++---------------
 1 file changed, 40 insertions(+), 22 deletions(-)

diff --git a/pkg/scheduler/objects/queue.go b/pkg/scheduler/objects/queue.go
index 214b2c6c..4999fd92 100644
--- a/pkg/scheduler/objects/queue.go
+++ b/pkg/scheduler/objects/queue.go
@@ -958,27 +958,32 @@ func (sq *Queue) addChildQueue(child *Queue) error {
 // This can be executed multiple times and is only effective the first time.
 // This is a noop on an unmanaged queue.
 func (sq *Queue) MarkQueueForRemoval() {
-       // need to lock for write as we don't want to add a queue while marking 
for removal
-       sq.Lock()
-       defer sq.Unlock()
+       if !sq.IsManaged() {
+               return
+       }
+       children := sq.GetCopyOfChildren()
        // Mark the managed queue for deletion: it is removed from the config 
let it drain.
        // Also mark all the managed children for deletion.
-       if sq.isManaged {
-               log.Log(log.SchedQueue).Info("marking managed queue for 
deletion",
-                       zap.String("queue", sq.QueuePath))
-               if err := sq.handleQueueEvent(Remove); err != nil {
-                       log.Log(log.SchedQueue).Warn("failed to mark managed 
queue for deletion",
-                               zap.String("queue", sq.QueuePath),
-                               zap.Error(err))
-               }
-               if len(sq.children) > 0 {
-                       for _, child := range sq.children {
-                               child.MarkQueueForRemoval()
-                       }
+       log.Log(log.SchedQueue).Info("marking managed queue for deletion",
+               zap.String("queue", sq.QueuePath))
+       sq.doRemoveQueue()
+       if len(sq.children) > 0 {
+               for _, child := range children {
+                       child.MarkQueueForRemoval()
                }
        }
 }
 
+func (sq *Queue) doRemoveQueue() {
+       sq.Lock()
+       defer sq.Unlock()
+       if err := sq.handleQueueEvent(Remove); err != nil {
+               log.Log(log.SchedQueue).Warn("failed to mark managed queue for 
deletion",
+                       zap.String("queue", sq.QueuePath),
+                       zap.Error(err))
+       }
+}
+
 // GetChildQueue returns a queue if the name exists in the child map as a key.
 func (sq *Queue) GetChildQueue(name string) *Queue {
        sq.RLock()
@@ -1031,13 +1036,10 @@ func (sq *Queue) isRoot() bool {
 // IncAllocatedResource increments the allocated resources for this queue 
(recursively).
 // Guard against going over max resources if set
 func (sq *Queue) IncAllocatedResource(alloc *resources.Resource, nodeReported 
bool) error {
-       sq.Lock()
-       defer sq.Unlock()
-
        // check this queue: failure stops checks if the allocation is not part 
of a node addition
        newAllocated := resources.Add(sq.allocatedResource, alloc)
        if !nodeReported {
-               if !sq.maxResource.FitInMaxUndef(newAllocated) {
+               if !sq.resourceFitsMaxRes(newAllocated) {
                        return fmt.Errorf("allocation (%v) puts queue '%s' over 
maximum allocation (%v), current usage (%v)",
                                alloc, sq.QueuePath, sq.maxResource, 
sq.allocatedResource)
                }
@@ -1058,23 +1060,30 @@ func (sq *Queue) IncAllocatedResource(alloc 
*resources.Resource, nodeReported bo
                        return err
                }
        }
+       sq.Lock()
+       defer sq.Unlock()
        // all OK update this queue
        sq.allocatedResource = newAllocated
        sq.updateAllocatedResourceMetrics()
        return nil
 }
 
+// small helper method to access sq.maxResource and avoid Clone() call
+func (sq *Queue) resourceFitsMaxRes(res *resources.Resource) bool {
+       sq.RLock()
+       defer sq.RUnlock()
+       return sq.maxResource.FitInMaxUndef(res)
+}
+
 // DecAllocatedResource decrement the allocated resources for this queue 
(recursively)
 // Guard against going below zero resources.
 func (sq *Queue) DecAllocatedResource(alloc *resources.Resource) error {
        if sq == nil {
                return fmt.Errorf("queue is nil")
        }
-       sq.Lock()
-       defer sq.Unlock()
 
        // check this queue: failure stops checks
-       if alloc != nil && !sq.allocatedResource.FitIn(alloc) {
+       if alloc != nil && !sq.resourceFitsAllocated(alloc) {
                return fmt.Errorf("released allocation (%v) is larger than '%s' 
queue allocation (%v)",
                        alloc, sq.QueuePath, sq.allocatedResource)
        }
@@ -1094,12 +1103,21 @@ func (sq *Queue) DecAllocatedResource(alloc 
*resources.Resource) error {
                        return err
                }
        }
+       sq.Lock()
+       defer sq.Unlock()
        // all OK update the queue
        sq.allocatedResource = resources.Sub(sq.allocatedResource, alloc)
        sq.updateAllocatedResourceMetrics()
        return nil
 }
 
+// small helper method to access sq.allocatedResource and avoid Clone() call
+func (sq *Queue) resourceFitsAllocated(res *resources.Resource) bool {
+       sq.RLock()
+       defer sq.RUnlock()
+       return sq.allocatedResource.FitIn(res)
+}
+
 // IncPreemptingResource increments the preempting resources for this queue 
(recursively).
 func (sq *Queue) IncPreemptingResource(alloc *resources.Resource) {
        if sq == nil {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to