This is an automated email from the ASF dual-hosted git repository.
wilfreds pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git
The following commit(s) were added to refs/heads/master by this push:
new 1c2d0328 [YUNIKORN-2171] race between node removal and scheduling
(#724)
1c2d0328 is described below
commit 1c2d0328eab687cce71d1df4a2a50e62fee9ba86
Author: Wilfred Spiegelenburg <[email protected]>
AuthorDate: Tue Nov 28 12:25:11 2023 +1100
[YUNIKORN-2171] race between node removal and scheduling (#724)
When a node gets removed the partition resources and thus the root max
resources are decreased. Cleanup of the allocations happens after that.
This means that for a short period of time the root queue max resources
are already decreased while the usage is not.
If the queue headroom is limited by the root queue then we could have a
race between the removal of the node allocations and scheduling adding a
new allocation. Scheduling could fail with the root queue usage being
over the max resources.
Closes: #724
Signed-off-by: Wilfred Spiegelenburg <[email protected]>
---
pkg/scheduler/partition.go | 48 +++++++++++++++++++++++++++++++---------------
1 file changed, 33 insertions(+), 15 deletions(-)
diff --git a/pkg/scheduler/partition.go b/pkg/scheduler/partition.go
index 8c12e1cc..25d7f32b 100644
--- a/pkg/scheduler/partition.go
+++ b/pkg/scheduler/partition.go
@@ -611,9 +611,8 @@ func (pc *PartitionContext) addNodeToList(node
*objects.Node) error {
return nil
}
-// Update the partition details when removing a node.
-// This locks the partition. The partition may not be locked when we process
the allocation
-// removal from the node as that takes further app, queue or node locks
+// removeNodeFromList removes the node from the list of partition nodes.
+// This locks the partition.
func (pc *PartitionContext) removeNodeFromList(nodeID string) *objects.Node {
pc.Lock()
defer pc.Unlock()
@@ -628,26 +627,37 @@ func (pc *PartitionContext) removeNodeFromList(nodeID
string) *objects.Node {
// Remove node from list of tracked nodes
metrics.GetSchedulerMetrics().DecActiveNodes()
- // found the node cleanup the available resources, partition resources
cannot be nil at this point
+ log.Log(log.SchedPartition).Info("Removed node from available partition
nodes",
+ zap.String("partitionName", pc.Name),
+ zap.String("nodeID", node.NodeID))
+ return node
+}
+
+// removeNodeResources updates the partition and root queue resources as part
of the node removal process.
+// This locks the partition.
+func (pc *PartitionContext) removeNodeResources(node *objects.Node) {
+ pc.Lock()
+ defer pc.Unlock()
+ // cleanup the available resources, partition resources cannot be nil
at this point
pc.totalPartitionResource.SubFrom(node.GetCapacity())
pc.root.SetMaxResource(pc.totalPartitionResource)
log.Log(log.SchedPartition).Info("Updated available resources from
removed node",
zap.String("partitionName", pc.Name),
zap.String("nodeID", node.NodeID),
zap.Stringer("partitionResource", pc.totalPartitionResource))
- return node
}
-// Remove a node from the partition. It returns all removed and confirmed
allocations.
-// The removed allocations are all linked to the current node.
+// removeNode removes a node from the partition. It returns all released and
confirmed allocations.
+// The released allocations are all linked to the current node.
// The confirmed allocations are real allocations that are linked to
placeholders on the current node and are linked to
// other nodes.
// NOTE: this is a lock free call. It must NOT be called holding the
PartitionContext lock.
func (pc *PartitionContext) removeNode(nodeID string) ([]*objects.Allocation,
[]*objects.Allocation) {
- log.Log(log.SchedPartition).Info("removing node from partition",
+ log.Log(log.SchedPartition).Info("Removing node from partition",
zap.String("partition", pc.Name),
zap.String("nodeID", nodeID))
+ // remove the node: it will no longer be seen by the scheduling cycle
node := pc.removeNodeFromList(nodeID)
if node == nil {
return nil, nil
@@ -659,13 +669,19 @@ func (pc *PartitionContext) removeNode(nodeID string)
([]*objects.Allocation, []
_, app, ask := r.GetObjects()
pc.unReserve(app, node, ask)
}
- // cleanup the allocations linked to the node
- return pc.removeNodeAllocations(node)
+ // cleanup the allocations linked to the node. do this before changing
the root queue max: otherwise if
+ // scheduling and removal of a node race on a full cluster we could
cause all headroom to disappear for
+ // the time the allocations are not removed.
+ released, confirmed := pc.removeNodeAllocations(node)
+
+ // update the resource linked to this node, all allocations are
removed, queue usage should have decreased
+ pc.removeNodeResources(node)
+ return released, confirmed
}
-// Remove all allocations that are assigned to a node as part of the node
removal. This is not part of the node object
-// as updating the applications and queues is the only goal. Applications and
queues are not accessible from the node.
-// The removed and confirmed allocations are returned.
+// removeNodeAllocations removes all allocations that are assigned to a node
as part of the node removal. This is not part
+// of the node object as updating the applications and queues is the only
goal. Applications and queues are not accessible
+// from the node. The removed and confirmed allocations are returned.
// NOTE: this is a lock free call. It must NOT be called holding the
PartitionContext lock.
func (pc *PartitionContext) removeNodeAllocations(node *objects.Node)
([]*objects.Allocation, []*objects.Allocation) {
released := make([]*objects.Allocation, 0)
@@ -778,9 +794,11 @@ func (pc *PartitionContext) removeNodeAllocations(node
*objects.Node) ([]*object
// the allocation is removed so add it to the list that we
return
released = append(released, alloc)
metrics.GetQueueMetrics(queue.GetQueuePath()).IncReleasedContainer()
- log.Log(log.SchedPartition).Info("allocation removed from node",
+ log.Log(log.SchedPartition).Info("node removal: allocation
removed",
zap.String("nodeID", node.NodeID),
- zap.String("allocationId", allocID))
+ zap.String("queueName", queue.GetQueuePath()),
+ zap.String("appID", app.ApplicationID),
+ zap.Stringer("allocation", alloc))
}
// track the number of allocations: decrement the released allocation
AND increment with the confirmed
pc.updateAllocationCount(len(confirmed) - len(released))
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]