This is an automated email from the ASF dual-hosted git repository.

pbacsko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git


The following commit(s) were added to refs/heads/master by this push:
     new 8928beec [YUNIKORN-2550] Fix locking in PartitionContext (#840)
8928beec is described below

commit 8928beecc46ed21db0531b612874ba92190706fc
Author: Peter Bacsko <[email protected]>
AuthorDate: Mon Apr 15 13:12:38 2024 +0200

    [YUNIKORN-2550] Fix locking in PartitionContext (#840)
    
    Closes: #840
    
    Signed-off-by: Peter Bacsko <[email protected]>
---
 pkg/scheduler/objects/node_collection.go |  2 ++
 pkg/scheduler/partition.go               | 46 ++++++++++++++------------------
 2 files changed, 22 insertions(+), 26 deletions(-)

diff --git a/pkg/scheduler/objects/node_collection.go 
b/pkg/scheduler/objects/node_collection.go
index f991eec5..cea1b2a6 100644
--- a/pkg/scheduler/objects/node_collection.go
+++ b/pkg/scheduler/objects/node_collection.go
@@ -37,6 +37,8 @@ var acceptAll = func(node *Node) bool {
        return true
 }
 
+// NodeCollection represents a collection of nodes for a partition.
+// Implementations of this interface must be internally synchronized to avoid 
data races.
 type NodeCollection interface {
        AddNode(node *Node) error
        RemoveNode(nodeID string) *Node
diff --git a/pkg/scheduler/partition.go b/pkg/scheduler/partition.go
index 0ba7db79..2f3ebfe4 100644
--- a/pkg/scheduler/partition.go
+++ b/pkg/scheduler/partition.go
@@ -157,18 +157,22 @@ func (pc *PartitionContext) updatePreemption(conf 
configs.PartitionConfig) {
 }
 
 func (pc *PartitionContext) updatePartitionDetails(conf 
configs.PartitionConfig) error {
-       pc.Lock()
-       defer pc.Unlock()
+       // the following piece of code (before pc.Lock()) must be performed 
without locking
+       // to avoid lock order differences between PartitionContext and 
AppPlacementManager
        if len(conf.Queues) == 0 || conf.Queues[0].Name != configs.RootQueue {
                return fmt.Errorf("partition cannot be created without root 
queue")
        }
        log.Log(log.SchedPartition).Info("Updating placement manager rules on 
config reload")
-       err := pc.placementManager.UpdateRules(conf.PlacementRules)
+       err := pc.getPlacementManager().UpdateRules(conf.PlacementRules)
        if err != nil {
                log.Log(log.SchedPartition).Info("New placement rules not 
activated, config reload failed", zap.Error(err))
                return err
        }
        pc.updateNodeSortingPolicy(conf)
+
+       pc.Lock()
+       defer pc.Unlock()
+       pc.rules = &conf.PlacementRules
        pc.updatePreemption(conf)
        // start at the root: there is only one queue
        queueConf := conf.Queues[0]
@@ -526,9 +530,6 @@ func (pc *PartitionContext) createQueue(name string, user 
security.UserGroup) (*
 
 // Get a node from the partition by nodeID.
 func (pc *PartitionContext) GetNode(nodeID string) *objects.Node {
-       pc.RLock()
-       defer pc.RUnlock()
-
        return pc.nodes.GetNode(nodeID)
 }
 
@@ -584,18 +585,24 @@ func (pc *PartitionContext) updatePartitionResource(delta 
*resources.Resource) {
        }
 }
 
-// Update the partition details when removing a node.
-// This locks the partition. The partition may not be locked when we process 
the allocation
-// additions to the node as that takes further app, queue or node locks
+// Update the partition details when adding a node.
 func (pc *PartitionContext) addNodeToList(node *objects.Node) error {
-       pc.Lock()
-       defer pc.Unlock()
-       // Node can be added to the system to allow processing of the 
allocations
+       // we don't grab a lock here because we only update pc.nodes which is 
internally protected
        if err := pc.nodes.AddNode(node); err != nil {
                return fmt.Errorf("failed to add node %s to partition %s, 
error: %v", node.NodeID, pc.Name, err)
        }
-       metrics.GetSchedulerMetrics().IncActiveNodes()
 
+       pc.addNodeResources(node)
+       return nil
+}
+
+// Update metrics & resource tracking information.
+// This locks the partition. The partition may not be locked when we process 
the allocation
+// additions to the node as that takes further app, queue or node locks.
+func (pc *PartitionContext) addNodeResources(node *objects.Node) {
+       pc.Lock()
+       defer pc.Unlock()
+       metrics.GetSchedulerMetrics().IncActiveNodes()
        // update/set the resources available in the cluster
        if pc.totalPartitionResource == nil {
                pc.totalPartitionResource = node.GetCapacity().Clone()
@@ -607,14 +614,10 @@ func (pc *PartitionContext) addNodeToList(node 
*objects.Node) error {
                zap.String("partitionName", pc.Name),
                zap.String("nodeID", node.NodeID),
                zap.Stringer("partitionResource", pc.totalPartitionResource))
-       return nil
 }
 
 // removeNodeFromList removes the node from the list of partition nodes.
-// This locks the partition.
 func (pc *PartitionContext) removeNodeFromList(nodeID string) *objects.Node {
-       pc.Lock()
-       defer pc.Unlock()
        node := pc.nodes.RemoveNode(nodeID)
        if node == nil {
                log.Log(log.SchedPartition).Debug("node was not found, node 
already removed",
@@ -625,7 +628,6 @@ func (pc *PartitionContext) removeNodeFromList(nodeID 
string) *objects.Node {
 
        // Remove node from list of tracked nodes
        metrics.GetSchedulerMetrics().DecActiveNodes()
-
        log.Log(log.SchedPartition).Info("Removed node from available partition 
nodes",
                zap.String("partitionName", pc.Name),
                zap.String("nodeID", node.NodeID))
@@ -1027,8 +1029,6 @@ func (pc *PartitionContext) GetTotalAllocationCount() int 
{
 }
 
 func (pc *PartitionContext) GetTotalNodeCount() int {
-       pc.RLock()
-       defer pc.RUnlock()
        return pc.nodes.GetNodeCount()
 }
 
@@ -1114,8 +1114,6 @@ func (pc *PartitionContext) cleanupExpiredApps() {
 
 // GetNodes returns a slice of all nodes unfiltered from the iterator
 func (pc *PartitionContext) GetNodes() []*objects.Node {
-       pc.RLock()
-       defer pc.RUnlock()
        return pc.nodes.GetNodes()
 }
 
@@ -1437,15 +1435,11 @@ func (pc *PartitionContext) GetStateTime() time.Time {
 }
 
 func (pc *PartitionContext) GetNodeSortingPolicyType() policies.SortingPolicy {
-       pc.RLock()
-       defer pc.RUnlock()
        policy := pc.nodes.GetNodeSortingPolicy()
        return policy.PolicyType()
 }
 
 func (pc *PartitionContext) GetNodeSortingResourceWeights() map[string]float64 
{
-       pc.RLock()
-       defer pc.RUnlock()
        policy := pc.nodes.GetNodeSortingPolicy()
        return policy.ResourceWeights()
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to