This is an automated email from the ASF dual-hosted git repository.
ccondit pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git
The following commit(s) were added to refs/heads/master by this push:
new 9eee423b [YUNIKORN-3036] fix race prevention regression (#1014)
9eee423b is described below
commit 9eee423b3df00cc9dbe98ae05888ddf525753a07
Author: Wilfred Spiegelenburg <[email protected]>
AuthorDate: Mon Mar 3 11:26:58 2025 -0600
[YUNIKORN-3036] fix race prevention regression (#1014)
Re-instate the change from YUNIKORN-1993 that was reversed as part of
YUNIKORN-2658. Add more explicit comment to not move the code.
Minor cleanup in function naming and fix the function length of
removeAllocation (lint fix)
Closes: #1014
Signed-off-by: Craig Condit <[email protected]>
---
pkg/scheduler/partition.go | 54 +++++++++++++++------------
pkg/scheduler/partition_test.go | 71 +++++++++---------------------------
pkg/scheduler/tests/recovery_test.go | 4 +-
3 files changed, 48 insertions(+), 81 deletions(-)
diff --git a/pkg/scheduler/partition.go b/pkg/scheduler/partition.go
index f8d34a70..e912f5dc 100644
--- a/pkg/scheduler/partition.go
+++ b/pkg/scheduler/partition.go
@@ -1416,7 +1416,9 @@ func (pc *PartitionContext) calculateNodesResourceUsage()
map[string][]int {
return mapResult
}
-func (pc *PartitionContext) generateReleased(release *si.AllocationRelease,
app *objects.Application) []*objects.Allocation {
+// processAllocationRelease processes the releases from the RM and removes the
allocation(s) as requested.
+// Updates the application which can trigger an application state change.
+func (pc *PartitionContext) processAllocationRelease(release
*si.AllocationRelease, app *objects.Application) []*objects.Allocation {
released := make([]*objects.Allocation, 0)
// when allocationKey is not specified, remove all allocations from the
app
allocationKey := release.GetAllocationKey()
@@ -1448,7 +1450,7 @@ func (pc *PartitionContext) generateReleased(release
*si.AllocationRelease, app
// removeAllocation removes the referenced allocation(s) from the applications
and nodes
// NOTE: this is a lock free call. It must NOT be called holding the
PartitionContext lock.
-func (pc *PartitionContext) removeAllocation(release *si.AllocationRelease)
([]*objects.Allocation, *objects.Allocation) { //nolint:funlen
+func (pc *PartitionContext) removeAllocation(release *si.AllocationRelease)
([]*objects.Allocation, *objects.Allocation) {
if release == nil {
return nil, nil
}
@@ -1468,25 +1470,20 @@ func (pc *PartitionContext) removeAllocation(release
*si.AllocationRelease) ([]*
return nil, nil
}
- // temp store for allocations manipulated
- released := pc.generateReleased(release, app)
- var confirmed *objects.Allocation
+ // **** DO NOT MOVE **** this must be called before any allocations are
released.
+ // Processing a removal while in the Completing state could race with
the state change. The race occurs between
+ // removing the allocation and updating the queue after node
processing. If the state change removes the queue link
+ // before we get to updating the queue after the node we leave the
resources as allocated on the queue. The queue
+ // will always exist at this point. Retrieving the queue now sidesteps
this.
+ queue := app.GetQueue()
- // all releases are collected: placeholder count needs updating for all
placeholder releases
- // regardless of what happens later
- phReleases := 0
- for _, r := range released {
- if r.IsPlaceholder() {
- phReleases++
- }
- }
- if phReleases > 0 {
- pc.decPhAllocationCount(phReleases)
- }
+ released := pc.processAllocationRelease(release, app)
+ pc.updatePhAllocationCount(released)
- // for each allocation to release, update the node and queue.
total := resources.NewResource()
totalPreempting := resources.NewResource()
+ var confirmed *objects.Allocation
+ // for each allocation to release, update the node and queue.
for _, alloc := range released {
node := pc.GetNode(alloc.GetNodeID())
if node == nil {
@@ -1537,13 +1534,6 @@ func (pc *PartitionContext) removeAllocation(release
*si.AllocationRelease) ([]*
}
}
- // Processing a removal while in the Completing state could race with
the state change.
- // The race occurs between removing the allocation and updating the
queue after node processing.
- // If the state change removes the queue link before we get to updating
the queue after the node we
- // leave the resources as allocated on the queue. The queue cannot be
removed yet at this point as
- // there are still allocations left. So retrieve the queue early to
sidestep the race.
- queue := app.GetQueue()
-
if resources.StrictlyGreaterThanZero(total) {
if err := queue.DecAllocatedResource(total); err != nil {
log.Log(log.SchedPartition).Warn("failed to release
resources from queue",
@@ -1581,6 +1571,22 @@ func (pc *PartitionContext) removeAllocation(release
*si.AllocationRelease) ([]*
return released, confirmed
}
+// updatePhAllocationCount checks the released allocations and updates the
partition context counter of allocated
+// placeholders.
+func (pc *PartitionContext) updatePhAllocationCount(released
[]*objects.Allocation) {
+ // all releases are collected: placeholder count needs updating for all
placeholder releases
+ // regardless of what happens later
+ phReleases := 0
+ for _, a := range released {
+ if a.IsPlaceholder() {
+ phReleases++
+ }
+ }
+ if phReleases > 0 {
+ pc.decPhAllocationCount(phReleases)
+ }
+}
+
func (pc *PartitionContext) removeForeignAllocation(allocID string) {
pc.Lock()
defer pc.Unlock()
diff --git a/pkg/scheduler/partition_test.go b/pkg/scheduler/partition_test.go
index 4fac0f49..9df58cec 100644
--- a/pkg/scheduler/partition_test.go
+++ b/pkg/scheduler/partition_test.go
@@ -1541,9 +1541,7 @@ func TestGetQueue(t *testing.T) {
func TestTryAllocate(t *testing.T) {
setupUGM()
partition := createQueuesNodes(t)
- if partition == nil {
- t.Fatal("partition create failed")
- }
+ assert.Assert(t, partition != nil, "partition create failed")
if result := partition.tryAllocate(); result != nil {
t.Fatalf("empty cluster allocate returned allocation: %s",
result)
}
@@ -1620,12 +1618,7 @@ func TestTryAllocate(t *testing.T) {
func TestRequiredNodeReservation(t *testing.T) {
setupUGM()
partition := createQueuesNodes(t)
- if partition == nil {
- t.Fatal("partition create failed")
- }
- if result := partition.tryAllocate(); result != nil {
- t.Fatalf("empty cluster allocate returned allocation: %s",
result)
- }
+ assert.Assert(t, partition != nil, "partition create failed")
node := partition.nodes.GetNode(nodeID1)
if node == nil {
t.Fatal("node-1 should have been created")
@@ -1706,9 +1699,7 @@ func TestRequiredNodeReservation(t *testing.T) {
// allocate ask request with required node having non daemon set reservations
func TestRequiredNodeCancelOtherReservations(t *testing.T) {
partition := createQueuesNodes(t)
- if partition == nil {
- t.Fatal("partition create failed")
- }
+ assert.Assert(t, partition != nil, "partition create failed")
if result := partition.tryAllocate(); result != nil {
t.Fatalf("empty cluster allocate returned allocation: %s",
result)
}
@@ -1786,9 +1777,7 @@ func TestRequiredNodeCancelOtherReservations(t
*testing.T) {
// allocate ask request with required node having daemon set reservations
func TestRequiredNodeCancelDSReservations(t *testing.T) {
partition := createQueuesNodes(t)
- if partition == nil {
- t.Fatal("partition create failed")
- }
+ assert.Assert(t, partition != nil, "partition create failed")
if result := partition.tryAllocate(); result != nil {
t.Fatalf("empty cluster allocate returned allocation: %s",
result)
}
@@ -1871,9 +1860,7 @@ func TestRequiredNodeCancelDSReservations(t *testing.T) {
func TestRequiredNodeNotExist(t *testing.T) {
setupUGM()
partition := createQueuesNodes(t)
- if partition == nil {
- t.Fatal("partition create failed")
- }
+ assert.Assert(t, partition != nil, "partition create failed")
if result := partition.tryAllocate(); result != nil {
t.Fatalf("empty cluster allocate returned allocation: %s",
result)
}
@@ -1908,9 +1895,7 @@ func TestRequiredNodeNotExist(t *testing.T) {
// basic ds scheduling on specific node in first allocate run itself (without
any need for reservation)
func TestRequiredNodeAllocation(t *testing.T) {
partition := createQueuesNodes(t)
- if partition == nil {
- t.Fatal("partition create failed")
- }
+ assert.Assert(t, partition != nil, "partition create failed")
if result := partition.tryAllocate(); result != nil {
t.Fatalf("empty cluster allocate returned allocation: %s",
result.Request.String())
}
@@ -2076,9 +2061,7 @@ func TestPreemptionForRequiredNodeReservedAlloc(t
*testing.T) {
func TestPreemptionForRequiredNodeMultipleAttemptsAvoided(t *testing.T) {
partition := createQueuesNodes(t)
- if partition == nil {
- t.Fatal("partition create failed")
- }
+ assert.Assert(t, partition != nil, "partition create failed")
app, testHandler := newApplicationWithHandler(appID1, "default",
"root.parent.sub-leaf")
res, err := resources.NewResourceFromConf(map[string]string{"vcore":
"8"})
@@ -2158,9 +2141,7 @@ func
getExpectedQueuesLimitsForPreemptionWithRequiredNode() map[string]map[strin
// setup the partition with existing allocations so we can test preemption
func setupPreemption(t *testing.T) (*PartitionContext, *objects.Application,
*objects.Application, *objects.Allocation, *objects.Allocation) {
partition := createPreemptionQueuesNodes(t)
- if partition == nil {
- t.Fatal("partition create failed")
- }
+ assert.Assert(t, partition != nil, "partition create failed")
if result := partition.tryAllocate(); result != nil {
t.Fatalf("empty cluster allocate returned allocation: %s",
result)
}
@@ -2220,9 +2201,7 @@ func setupPreemption(t *testing.T) (*PartitionContext,
*objects.Application, *ob
// setup the partition in a state that we need for multiple tests
func setupPreemptionForRequiredNode(t *testing.T) (*PartitionContext,
*objects.Application) {
partition := createQueuesNodes(t)
- if partition == nil {
- t.Fatal("partition create failed")
- }
+ assert.Assert(t, partition != nil, "partition create failed")
if result := partition.tryAllocate(); result != nil {
t.Fatalf("empty cluster allocate returned allocation: %s",
result)
}
@@ -2300,9 +2279,7 @@ func setupPreemptionForRequiredNode(t *testing.T)
(*PartitionContext, *objects.A
func TestTryAllocateLarge(t *testing.T) {
setupUGM()
partition := createQueuesNodes(t)
- if partition == nil {
- t.Fatal("partition create failed")
- }
+ assert.Assert(t, partition != nil, "partition create failed")
if result := partition.tryAllocate(); result != nil {
t.Fatalf("empty cluster allocate returned allocation: %s",
result)
}
@@ -2333,9 +2310,7 @@ func TestTryAllocateLarge(t *testing.T) {
func TestAllocReserveNewNode(t *testing.T) {
setupUGM()
partition := createQueuesNodes(t)
- if partition == nil {
- t.Fatal("partition create failed")
- }
+ assert.Assert(t, partition != nil, "partition create failed")
if result := partition.tryAllocate(); result != nil {
t.Fatalf("empty cluster allocate returned result: %s", result)
}
@@ -2404,9 +2379,7 @@ func TestAllocReserveNewNode(t *testing.T) {
func TestTryAllocateReserve(t *testing.T) {
setupUGM()
partition := createQueuesNodes(t)
- if partition == nil {
- t.Fatal("partition create failed")
- }
+ assert.Assert(t, partition != nil, "partition create failed")
if result := partition.tryReservedAllocate(); result != nil {
t.Fatalf("empty cluster reserved allocate returned allocation:
%s", result)
}
@@ -2478,9 +2451,7 @@ func TestTryAllocateReserve(t *testing.T) {
func TestTryAllocateWithReserved(t *testing.T) {
setupUGM()
partition := createQueuesNodes(t)
- if partition == nil {
- t.Fatal("partition create failed")
- }
+ assert.Assert(t, partition != nil, "partition create failed")
if alloc := partition.tryReservedAllocate(); alloc != nil {
t.Fatalf("empty cluster reserved allocate returned allocation:
%v", alloc)
}
@@ -2502,9 +2473,7 @@ func TestTryAllocateWithReserved(t *testing.T) {
// reserve one node: scheduling should happen on the other
node2 := partition.GetNode(nodeID2)
- if node2 == nil {
- t.Fatal("expected node-2 to be returned got nil")
- }
+ assert.Assert(t, node2 != nil, "expected node-2 to be returned got nil")
partition.reserve(app, node2, ask)
if app.NodeReservedForAsk(allocKey) != nodeID2 {
t.Fatal("reservation failure for alloc-1 and node-2")
@@ -2533,9 +2502,7 @@ func TestTryAllocateWithReserved(t *testing.T) {
func TestScheduleRemoveReservedAsk(t *testing.T) {
setupUGM()
partition := createQueuesNodes(t)
- if partition == nil {
- t.Fatal("partition create failed")
- }
+ assert.Assert(t, partition != nil, "partition create failed")
if result := partition.tryAllocate(); result != nil {
t.Fatalf("empty cluster allocate returned allocation: %s",
result)
}
@@ -2623,9 +2590,7 @@ func TestScheduleRemoveReservedAsk(t *testing.T) {
// update the config with nodes registered and make sure that the root max and
guaranteed are not changed
func TestUpdateRootQueue(t *testing.T) {
partition := createQueuesNodes(t)
- if partition == nil {
- t.Fatal("partition create failed")
- }
+ assert.Assert(t, partition != nil, "partition create failed")
res, err := resources.NewResourceFromConf(map[string]string{"vcore":
"20"})
assert.NilError(t, err, "resource creation failed")
assert.Assert(t, resources.Equals(res,
partition.totalPartitionResource), "partition resource not set as expected")
@@ -3927,9 +3892,7 @@ func TestGetNodeSortingPolicyWhenNewPartitionFromConfig(t
*testing.T) {
func TestTryAllocateMaxRunning(t *testing.T) {
const resType = "vcore"
partition := createQueuesNodes(t)
- if partition == nil {
- t.Fatal("partition create failed")
- }
+ assert.Assert(t, partition != nil, "partition create failed")
if result := partition.tryAllocate(); result != nil {
t.Fatalf("empty cluster allocate returned allocation: %s",
result)
}
diff --git a/pkg/scheduler/tests/recovery_test.go
b/pkg/scheduler/tests/recovery_test.go
index a7102077..46248043 100644
--- a/pkg/scheduler/tests/recovery_test.go
+++ b/pkg/scheduler/tests/recovery_test.go
@@ -675,9 +675,7 @@ func TestAppRecovery(t *testing.T) {
mockRM.waitForAcceptedNode(t, "node-2:1234", 1000)
app :=
serviceContext.Scheduler.GetClusterContext().GetApplication(appID1,
"[rm:123]default")
- if app == nil {
- t.Fatal("application not found after recovery")
- }
+ assert.Assert(t, app != nil, "application not found after recovery")
assert.Equal(t, app.ApplicationID, appID1)
assert.Equal(t, app.GetQueuePath(), "root.a")
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]