This is an automated email from the ASF dual-hosted git repository. pbacsko pushed a commit to branch branch-1.5 in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git
commit c5a901f9868ba6a3bca024f63fee71a38a8b0180 Author: Peter Bacsko <[email protected]> AuthorDate: Mon Apr 15 13:12:38 2024 +0200 [YUNIKORN-2550] Fix locking in PartitionContext (#840) Closes: #840 Signed-off-by: Peter Bacsko <[email protected]> --- pkg/scheduler/objects/node_collection.go | 2 ++ pkg/scheduler/partition.go | 45 ++++++++++++++------------------ 2 files changed, 21 insertions(+), 26 deletions(-) diff --git a/pkg/scheduler/objects/node_collection.go b/pkg/scheduler/objects/node_collection.go index f991eec5..cea1b2a6 100644 --- a/pkg/scheduler/objects/node_collection.go +++ b/pkg/scheduler/objects/node_collection.go @@ -37,6 +37,8 @@ var acceptAll = func(node *Node) bool { return true } +// NodeCollection represents a collection of nodes for a partition. +// Implementations of this interface must be internally synchronized to avoid data races. type NodeCollection interface { AddNode(node *Node) error RemoveNode(nodeID string) *Node diff --git a/pkg/scheduler/partition.go b/pkg/scheduler/partition.go index 5774cfab..61a42b45 100644 --- a/pkg/scheduler/partition.go +++ b/pkg/scheduler/partition.go @@ -157,18 +157,21 @@ func (pc *PartitionContext) updatePreemption(conf configs.PartitionConfig) { } func (pc *PartitionContext) updatePartitionDetails(conf configs.PartitionConfig) error { - pc.Lock() - defer pc.Unlock() + // the following piece of code (before pc.Lock()) must be performed without locking + // to avoid lock order differences between PartitionContext and AppPlacementManager if len(conf.Queues) == 0 || conf.Queues[0].Name != configs.RootQueue { return fmt.Errorf("partition cannot be created without root queue") } log.Log(log.SchedPartition).Info("Updating placement manager rules on config reload") - err := pc.placementManager.UpdateRules(conf.PlacementRules) + err := pc.getPlacementManager().UpdateRules(conf.PlacementRules) if err != nil { log.Log(log.SchedPartition).Info("New placement rules not activated, config reload failed", zap.Error(err)) return err } pc.updateNodeSortingPolicy(conf) + + pc.Lock() + defer pc.Unlock() pc.updatePreemption(conf) // start at the root: there is only one queue queueConf := conf.Queues[0] @@ -528,9 +531,6 @@ func (pc *PartitionContext) createQueue(name string, user security.UserGroup) (* // Get a node from the partition by nodeID. func (pc *PartitionContext) GetNode(nodeID string) *objects.Node { - pc.RLock() - defer pc.RUnlock() - return pc.nodes.GetNode(nodeID) } @@ -586,18 +586,24 @@ func (pc *PartitionContext) updatePartitionResource(delta *resources.Resource) { } } -// Update the partition details when removing a node. -// This locks the partition. The partition may not be locked when we process the allocation -// additions to the node as that takes further app, queue or node locks +// Update the partition details when adding a node. func (pc *PartitionContext) addNodeToList(node *objects.Node) error { - pc.Lock() - defer pc.Unlock() - // Node can be added to the system to allow processing of the allocations + // we don't grab a lock here because we only update pc.nodes which is internally protected if err := pc.nodes.AddNode(node); err != nil { return fmt.Errorf("failed to add node %s to partition %s, error: %v", node.NodeID, pc.Name, err) } - metrics.GetSchedulerMetrics().IncActiveNodes() + pc.addNodeResources(node) + return nil +} + +// Update metrics & resource tracking information. +// This locks the partition. The partition may not be locked when we process the allocation +// additions to the node as that takes further app, queue or node locks. +func (pc *PartitionContext) addNodeResources(node *objects.Node) { + pc.Lock() + defer pc.Unlock() + metrics.GetSchedulerMetrics().IncActiveNodes() // update/set the resources available in the cluster if pc.totalPartitionResource == nil { pc.totalPartitionResource = node.GetCapacity().Clone() @@ -609,14 +615,10 @@ func (pc *PartitionContext) addNodeToList(node *objects.Node) error { zap.String("partitionName", pc.Name), zap.String("nodeID", node.NodeID), zap.Stringer("partitionResource", pc.totalPartitionResource)) - return nil } // removeNodeFromList removes the node from the list of partition nodes. -// This locks the partition. func (pc *PartitionContext) removeNodeFromList(nodeID string) *objects.Node { - pc.Lock() - defer pc.Unlock() node := pc.nodes.RemoveNode(nodeID) if node == nil { log.Log(log.SchedPartition).Debug("node was not found, node already removed", @@ -627,7 +629,6 @@ func (pc *PartitionContext) removeNodeFromList(nodeID string) *objects.Node { // Remove node from list of tracked nodes metrics.GetSchedulerMetrics().DecActiveNodes() - log.Log(log.SchedPartition).Info("Removed node from available partition nodes", zap.String("partitionName", pc.Name), zap.String("nodeID", node.NodeID)) @@ -1029,8 +1030,6 @@ func (pc *PartitionContext) GetTotalAllocationCount() int { } func (pc *PartitionContext) GetTotalNodeCount() int { - pc.RLock() - defer pc.RUnlock() return pc.nodes.GetNodeCount() } @@ -1116,8 +1115,6 @@ func (pc *PartitionContext) cleanupExpiredApps() { // GetNodes returns a slice of all nodes unfiltered from the iterator func (pc *PartitionContext) GetNodes() []*objects.Node { - pc.RLock() - defer pc.RUnlock() return pc.nodes.GetNodes() } @@ -1439,15 +1436,11 @@ func (pc *PartitionContext) GetStateTime() time.Time { } func (pc *PartitionContext) GetNodeSortingPolicyType() policies.SortingPolicy { - pc.RLock() - defer pc.RUnlock() policy := pc.nodes.GetNodeSortingPolicy() return policy.PolicyType() } func (pc *PartitionContext) GetNodeSortingResourceWeights() map[string]float64 { - pc.RLock() - defer pc.RUnlock() policy := pc.nodes.GetNodeSortingPolicy() return policy.ResourceWeights() } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
