This is an automated email from the ASF dual-hosted git repository.
pbacsko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git
The following commit(s) were added to refs/heads/master by this push:
new 8928beec [YUNIKORN-2550] Fix locking in PartitionContext (#840)
8928beec is described below
commit 8928beecc46ed21db0531b612874ba92190706fc
Author: Peter Bacsko <[email protected]>
AuthorDate: Mon Apr 15 13:12:38 2024 +0200
[YUNIKORN-2550] Fix locking in PartitionContext (#840)
Closes: #840
Signed-off-by: Peter Bacsko <[email protected]>
---
pkg/scheduler/objects/node_collection.go | 2 ++
pkg/scheduler/partition.go | 46 ++++++++++++++------------------
2 files changed, 22 insertions(+), 26 deletions(-)
diff --git a/pkg/scheduler/objects/node_collection.go
b/pkg/scheduler/objects/node_collection.go
index f991eec5..cea1b2a6 100644
--- a/pkg/scheduler/objects/node_collection.go
+++ b/pkg/scheduler/objects/node_collection.go
@@ -37,6 +37,8 @@ var acceptAll = func(node *Node) bool {
return true
}
+// NodeCollection represents a collection of nodes for a partition.
+// Implementations of this interface must be internally synchronized to avoid
data races.
type NodeCollection interface {
AddNode(node *Node) error
RemoveNode(nodeID string) *Node
diff --git a/pkg/scheduler/partition.go b/pkg/scheduler/partition.go
index 0ba7db79..2f3ebfe4 100644
--- a/pkg/scheduler/partition.go
+++ b/pkg/scheduler/partition.go
@@ -157,18 +157,22 @@ func (pc *PartitionContext) updatePreemption(conf
configs.PartitionConfig) {
}
func (pc *PartitionContext) updatePartitionDetails(conf
configs.PartitionConfig) error {
- pc.Lock()
- defer pc.Unlock()
+ // the following piece of code (before pc.Lock()) must be performed
without locking
+ // to avoid lock order differences between PartitionContext and
AppPlacementManager
if len(conf.Queues) == 0 || conf.Queues[0].Name != configs.RootQueue {
return fmt.Errorf("partition cannot be created without root
queue")
}
log.Log(log.SchedPartition).Info("Updating placement manager rules on
config reload")
- err := pc.placementManager.UpdateRules(conf.PlacementRules)
+ err := pc.getPlacementManager().UpdateRules(conf.PlacementRules)
if err != nil {
log.Log(log.SchedPartition).Info("New placement rules not
activated, config reload failed", zap.Error(err))
return err
}
pc.updateNodeSortingPolicy(conf)
+
+ pc.Lock()
+ defer pc.Unlock()
+ pc.rules = &conf.PlacementRules
pc.updatePreemption(conf)
// start at the root: there is only one queue
queueConf := conf.Queues[0]
@@ -526,9 +530,6 @@ func (pc *PartitionContext) createQueue(name string, user
security.UserGroup) (*
// Get a node from the partition by nodeID.
func (pc *PartitionContext) GetNode(nodeID string) *objects.Node {
- pc.RLock()
- defer pc.RUnlock()
-
return pc.nodes.GetNode(nodeID)
}
@@ -584,18 +585,24 @@ func (pc *PartitionContext) updatePartitionResource(delta
*resources.Resource) {
}
}
-// Update the partition details when removing a node.
-// This locks the partition. The partition may not be locked when we process
the allocation
-// additions to the node as that takes further app, queue or node locks
+// Update the partition details when adding a node.
func (pc *PartitionContext) addNodeToList(node *objects.Node) error {
- pc.Lock()
- defer pc.Unlock()
- // Node can be added to the system to allow processing of the
allocations
+ // we don't grab a lock here because we only update pc.nodes which is
internally protected
if err := pc.nodes.AddNode(node); err != nil {
return fmt.Errorf("failed to add node %s to partition %s,
error: %v", node.NodeID, pc.Name, err)
}
- metrics.GetSchedulerMetrics().IncActiveNodes()
+ pc.addNodeResources(node)
+ return nil
+}
+
+// Update metrics & resource tracking information.
+// This locks the partition. The partition may not be locked when we process
the allocation
+// additions to the node as that takes further app, queue or node locks.
+func (pc *PartitionContext) addNodeResources(node *objects.Node) {
+ pc.Lock()
+ defer pc.Unlock()
+ metrics.GetSchedulerMetrics().IncActiveNodes()
// update/set the resources available in the cluster
if pc.totalPartitionResource == nil {
pc.totalPartitionResource = node.GetCapacity().Clone()
@@ -607,14 +614,10 @@ func (pc *PartitionContext) addNodeToList(node
*objects.Node) error {
zap.String("partitionName", pc.Name),
zap.String("nodeID", node.NodeID),
zap.Stringer("partitionResource", pc.totalPartitionResource))
- return nil
}
// removeNodeFromList removes the node from the list of partition nodes.
-// This locks the partition.
func (pc *PartitionContext) removeNodeFromList(nodeID string) *objects.Node {
- pc.Lock()
- defer pc.Unlock()
node := pc.nodes.RemoveNode(nodeID)
if node == nil {
log.Log(log.SchedPartition).Debug("node was not found, node
already removed",
@@ -625,7 +628,6 @@ func (pc *PartitionContext) removeNodeFromList(nodeID
string) *objects.Node {
// Remove node from list of tracked nodes
metrics.GetSchedulerMetrics().DecActiveNodes()
-
log.Log(log.SchedPartition).Info("Removed node from available partition
nodes",
zap.String("partitionName", pc.Name),
zap.String("nodeID", node.NodeID))
@@ -1027,8 +1029,6 @@ func (pc *PartitionContext) GetTotalAllocationCount() int
{
}
func (pc *PartitionContext) GetTotalNodeCount() int {
- pc.RLock()
- defer pc.RUnlock()
return pc.nodes.GetNodeCount()
}
@@ -1114,8 +1114,6 @@ func (pc *PartitionContext) cleanupExpiredApps() {
// GetNodes returns a slice of all nodes unfiltered from the iterator
func (pc *PartitionContext) GetNodes() []*objects.Node {
- pc.RLock()
- defer pc.RUnlock()
return pc.nodes.GetNodes()
}
@@ -1437,15 +1435,11 @@ func (pc *PartitionContext) GetStateTime() time.Time {
}
func (pc *PartitionContext) GetNodeSortingPolicyType() policies.SortingPolicy {
- pc.RLock()
- defer pc.RUnlock()
policy := pc.nodes.GetNodeSortingPolicy()
return policy.PolicyType()
}
func (pc *PartitionContext) GetNodeSortingResourceWeights() map[string]float64
{
- pc.RLock()
- defer pc.RUnlock()
policy := pc.nodes.GetNodeSortingPolicy()
return policy.ResourceWeights()
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]