This is an automated email from the ASF dual-hosted git repository.
pbacsko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git
The following commit(s) were added to refs/heads/master by this push:
new 16d2c9cd [YUNIKORN-2548] Potential deadlock during concurrent
bottom-up/top-down queue traversal (#839)
16d2c9cd is described below
commit 16d2c9cd79e73be51d23f0ce5c5d9f153be140bc
Author: Peter Bacsko <[email protected]>
AuthorDate: Wed Apr 10 21:21:37 2024 +0200
[YUNIKORN-2548] Potential deadlock during concurrent bottom-up/top-down
queue traversal (#839)
Closes: #839
Signed-off-by: Peter Bacsko <[email protected]>
---
pkg/scheduler/objects/queue.go | 62 +++++++++++++++++++++++++++---------------
1 file changed, 40 insertions(+), 22 deletions(-)
diff --git a/pkg/scheduler/objects/queue.go b/pkg/scheduler/objects/queue.go
index 214b2c6c..4999fd92 100644
--- a/pkg/scheduler/objects/queue.go
+++ b/pkg/scheduler/objects/queue.go
@@ -958,27 +958,32 @@ func (sq *Queue) addChildQueue(child *Queue) error {
// This can be executed multiple times and is only effective the first time.
// This is a noop on an unmanaged queue.
func (sq *Queue) MarkQueueForRemoval() {
- // need to lock for write as we don't want to add a queue while marking
for removal
- sq.Lock()
- defer sq.Unlock()
+ if !sq.IsManaged() {
+ return
+ }
+ children := sq.GetCopyOfChildren()
// Mark the managed queue for deletion: it is removed from the config
let it drain.
// Also mark all the managed children for deletion.
- if sq.isManaged {
- log.Log(log.SchedQueue).Info("marking managed queue for
deletion",
- zap.String("queue", sq.QueuePath))
- if err := sq.handleQueueEvent(Remove); err != nil {
- log.Log(log.SchedQueue).Warn("failed to mark managed
queue for deletion",
- zap.String("queue", sq.QueuePath),
- zap.Error(err))
- }
- if len(sq.children) > 0 {
- for _, child := range sq.children {
- child.MarkQueueForRemoval()
- }
+ log.Log(log.SchedQueue).Info("marking managed queue for deletion",
+ zap.String("queue", sq.QueuePath))
+ sq.doRemoveQueue()
+ if len(sq.children) > 0 {
+ for _, child := range children {
+ child.MarkQueueForRemoval()
}
}
}
+func (sq *Queue) doRemoveQueue() {
+ sq.Lock()
+ defer sq.Unlock()
+ if err := sq.handleQueueEvent(Remove); err != nil {
+ log.Log(log.SchedQueue).Warn("failed to mark managed queue for
deletion",
+ zap.String("queue", sq.QueuePath),
+ zap.Error(err))
+ }
+}
+
// GetChildQueue returns a queue if the name exists in the child map as a key.
func (sq *Queue) GetChildQueue(name string) *Queue {
sq.RLock()
@@ -1031,13 +1036,10 @@ func (sq *Queue) isRoot() bool {
// IncAllocatedResource increments the allocated resources for this queue
(recursively).
// Guard against going over max resources if set
func (sq *Queue) IncAllocatedResource(alloc *resources.Resource, nodeReported
bool) error {
- sq.Lock()
- defer sq.Unlock()
-
// check this queue: failure stops checks if the allocation is not part
of a node addition
newAllocated := resources.Add(sq.allocatedResource, alloc)
if !nodeReported {
- if !sq.maxResource.FitInMaxUndef(newAllocated) {
+ if !sq.resourceFitsMaxRes(newAllocated) {
return fmt.Errorf("allocation (%v) puts queue '%s' over
maximum allocation (%v), current usage (%v)",
alloc, sq.QueuePath, sq.maxResource,
sq.allocatedResource)
}
@@ -1058,23 +1060,30 @@ func (sq *Queue) IncAllocatedResource(alloc
*resources.Resource, nodeReported bo
return err
}
}
+ sq.Lock()
+ defer sq.Unlock()
// all OK update this queue
sq.allocatedResource = newAllocated
sq.updateAllocatedResourceMetrics()
return nil
}
+// small helper method to access sq.maxResource and avoid Clone() call
+func (sq *Queue) resourceFitsMaxRes(res *resources.Resource) bool {
+ sq.RLock()
+ defer sq.RUnlock()
+ return sq.maxResource.FitInMaxUndef(res)
+}
+
// DecAllocatedResource decrement the allocated resources for this queue
(recursively)
// Guard against going below zero resources.
func (sq *Queue) DecAllocatedResource(alloc *resources.Resource) error {
if sq == nil {
return fmt.Errorf("queue is nil")
}
- sq.Lock()
- defer sq.Unlock()
// check this queue: failure stops checks
- if alloc != nil && !sq.allocatedResource.FitIn(alloc) {
+ if alloc != nil && !sq.resourceFitsAllocated(alloc) {
return fmt.Errorf("released allocation (%v) is larger than '%s'
queue allocation (%v)",
alloc, sq.QueuePath, sq.allocatedResource)
}
@@ -1094,12 +1103,21 @@ func (sq *Queue) DecAllocatedResource(alloc
*resources.Resource) error {
return err
}
}
+ sq.Lock()
+ defer sq.Unlock()
// all OK update the queue
sq.allocatedResource = resources.Sub(sq.allocatedResource, alloc)
sq.updateAllocatedResourceMetrics()
return nil
}
+// small helper method to access sq.allocatedResource and avoid Clone() call
+func (sq *Queue) resourceFitsAllocated(res *resources.Resource) bool {
+ sq.RLock()
+ defer sq.RUnlock()
+ return sq.allocatedResource.FitIn(res)
+}
+
// IncPreemptingResource increments the preempting resources for this queue
(recursively).
func (sq *Queue) IncPreemptingResource(alloc *resources.Resource) {
if sq == nil {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]