This is an automated email from the ASF dual-hosted git repository.
wilfreds pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git
The following commit(s) were added to refs/heads/master by this push:
new 57544a62 [YUNIKORN-1993] race condition in allocation removal (#662)
57544a62 is described below
commit 57544a624ae18a1e972076426a5c5a7d17769ac3
Author: Wilfred Spiegelenburg <[email protected]>
AuthorDate: Fri Sep 22 16:13:56 2023 +1000
[YUNIKORN-1993] race condition in allocation removal (#662)
Race between allocation removal and the timed transition to Completed
state change in the partition code.
If the removal of an allocation is in progress when the state transition
causes the queue removal (entry of the Completed state) the queue will
not be updated and the allocation will leak.
No test case as the timing required is not reproducible in unit or e2e
tests.
Closes: #662
Signed-off-by: Wilfred Spiegelenburg <[email protected]>
---
pkg/scheduler/partition.go | 22 +++++++++++++++-------
1 file changed, 15 insertions(+), 7 deletions(-)
diff --git a/pkg/scheduler/partition.go b/pkg/scheduler/partition.go
index 8bf2786e..f4bde125 100644
--- a/pkg/scheduler/partition.go
+++ b/pkg/scheduler/partition.go
@@ -685,6 +685,9 @@ func (pc *PartitionContext) removeNodeAllocations(node
*objects.Node) ([]*object
zap.String("nodeID", node.NodeID))
continue
}
+ // Processing a removal while in the Completing state could
race with the state change.
+ // Retrieve the queue early before a possible race.
+ queue := app.GetQueue()
// check for an inflight replacement.
if alloc.GetReleaseCount() != 0 {
release := alloc.GetFirstRelease()
@@ -706,7 +709,7 @@ func (pc *PartitionContext) removeNodeAllocations(node
*objects.Node) ([]*object
// The reverse case is handled during
allocation.
if delta.HasNegativeValue() {
// this looks incorrect but the
delta is negative and the result will be a real decrease
- err :=
app.GetQueue().IncAllocatedResource(delta, false)
+ err :=
queue.IncAllocatedResource(delta, false)
// this should not happen as we
really decrease the value
if err != nil {
log.Log(log.SchedPartition).Warn("unexpected failure during queue update:
replacing placeholder",
@@ -761,16 +764,16 @@ func (pc *PartitionContext) removeNodeAllocations(node
*objects.Node) ([]*object
zap.String("nodeID", node.NodeID))
continue
}
- if err :=
app.GetQueue().DecAllocatedResource(alloc.GetAllocatedResource()); err != nil {
+ if err :=
queue.DecAllocatedResource(alloc.GetAllocatedResource()); err != nil {
log.Log(log.SchedPartition).Warn("failed to release
resources from queue",
zap.String("appID", alloc.GetApplicationID()),
zap.Error(err))
} else {
-
metrics.GetQueueMetrics(app.GetQueuePath()).IncReleasedContainer()
+
metrics.GetQueueMetrics(queue.GetQueuePath()).IncReleasedContainer()
}
// remove preempted resources
if alloc.IsPreempted() {
-
app.GetQueue().DecPreemptingResource(alloc.GetAllocatedResource())
+
queue.DecPreemptingResource(alloc.GetAllocatedResource())
}
if alloc.IsPlaceholder() {
pc.decPhAllocationCount(1)
@@ -1251,6 +1254,12 @@ func (pc *PartitionContext) removeAllocation(release
*si.AllocationRelease) ([]*
zap.Stringer("terminationType",
release.TerminationType))
return nil, nil
}
+ // Processing a removal while in the Completing state could race with
the state change.
+ // The race occurs between removing the allocation and updating the
queue after node processing.
+ // If the state change removes the queue link before we get to updating
the queue after the node we
+ // leave the resources as allocated on the queue. The queue cannot be
removed yet at this point as
+ // there are still allocations left. So retrieve the queue early to
sidestep the race.
+ queue := app.GetQueue()
// temp store for allocations manipulated
released := make([]*objects.Allocation, 0)
var confirmed *objects.Allocation
@@ -1295,7 +1304,7 @@ func (pc *PartitionContext) removeAllocation(release
*si.AllocationRelease) ([]*
for _, alloc := range released {
node := pc.GetNode(alloc.GetNodeID())
if node == nil {
- log.Log(log.SchedPartition).Info("node not found while
releasing allocation",
+ log.Log(log.SchedPartition).Warn("node not found while
releasing allocation",
zap.String("appID", appID),
zap.String("allocationId", alloc.GetUUID()),
zap.String("nodeID", alloc.GetNodeID()))
@@ -1342,7 +1351,6 @@ func (pc *PartitionContext) removeAllocation(release
*si.AllocationRelease) ([]*
}
}
if resources.StrictlyGreaterThanZero(total) {
- queue := app.GetQueue()
if err := queue.DecAllocatedResource(total); err != nil {
log.Log(log.SchedPartition).Warn("failed to release
resources from queue",
zap.String("appID", appID),
@@ -1353,7 +1361,7 @@ func (pc *PartitionContext) removeAllocation(release
*si.AllocationRelease) ([]*
}
}
if resources.StrictlyGreaterThanZero(totalPreempting) {
- app.GetQueue().DecPreemptingResource(totalPreempting)
+ queue.DecPreemptingResource(totalPreempting)
}
// if confirmed is set we can assume there will just be one alloc in
the released
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]