This is an automated email from the ASF dual-hosted git repository.
ccondit pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git
The following commit(s) were added to refs/heads/master by this push:
new 4aec626c [YUNIKORN-2780] Remove unnecessary node ExistingAllocations
handling (#924)
4aec626c is described below
commit 4aec626c6bf9b61926a0681b801d22395af30fd6
Author: Craig Condit <[email protected]>
AuthorDate: Fri Aug 2 16:06:14 2024 -0500
[YUNIKORN-2780] Remove unnecessary node ExistingAllocations handling (#924)
With node recovery simplified, the shim never sends ExistingAllocations
on node registration. Remove code related to this as it will never be
executed.
Closes: #924
---
go.mod | 2 +-
go.sum | 4 +-
pkg/scheduler/context.go | 15 +--
pkg/scheduler/health_checker_test.go | 2 +-
pkg/scheduler/partition.go | 30 +-----
pkg/scheduler/partition_test.go | 142 +++++++++----------------
pkg/scheduler/tests/mock_rm_callback_test.go | 9 --
pkg/scheduler/tests/recovery_test.go | 149 +++++++++++++++++----------
pkg/scheduler/utilities_test.go | 8 +-
pkg/webservice/handlers_test.go | 36 +++++--
10 files changed, 187 insertions(+), 210 deletions(-)
diff --git a/go.mod b/go.mod
index 5f180926..7bbe0319 100644
--- a/go.mod
+++ b/go.mod
@@ -22,7 +22,7 @@ module github.com/apache/yunikorn-core
go 1.21
require (
- github.com/apache/yunikorn-scheduler-interface
v0.0.0-20240425182941-07f5695119a1
+ github.com/apache/yunikorn-scheduler-interface
v0.0.0-20240731203810-92032b13d586
github.com/google/btree v1.1.2
github.com/google/go-cmp v0.6.0
github.com/google/uuid v1.6.0
diff --git a/go.sum b/go.sum
index f04fef47..9b2672af 100644
--- a/go.sum
+++ b/go.sum
@@ -1,5 +1,5 @@
-github.com/apache/yunikorn-scheduler-interface
v0.0.0-20240425182941-07f5695119a1
h1:v4J9L3MlW8BQfYnbq6FV2l3uyay3SqMS2Ffpo+SFat4=
-github.com/apache/yunikorn-scheduler-interface
v0.0.0-20240425182941-07f5695119a1/go.mod
h1:WuHJpVk34t8N5+1ErYGj/5Qq33/cRzL4YtuoAsbMtWc=
+github.com/apache/yunikorn-scheduler-interface
v0.0.0-20240731203810-92032b13d586
h1:ZVpo9Qj2/gvwX6Rl44UxkZBm2pZWEJDYWTramc9hwF0=
+github.com/apache/yunikorn-scheduler-interface
v0.0.0-20240731203810-92032b13d586/go.mod
h1:WuHJpVk34t8N5+1ErYGj/5Qq33/cRzL4YtuoAsbMtWc=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod
h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/cespare/xxhash/v2 v2.2.0
h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
diff --git a/pkg/scheduler/context.go b/pkg/scheduler/context.go
index 239a12d7..fe42c62d 100644
--- a/pkg/scheduler/context.go
+++ b/pkg/scheduler/context.go
@@ -607,8 +607,7 @@ func (cc *ClusterContext) addNode(nodeInfo *si.NodeInfo,
schedulable bool) error
return err
}
- existingAllocations :=
cc.convertAllocations(nodeInfo.ExistingAllocations)
- err := partition.AddNode(sn, existingAllocations)
+ err := partition.AddNode(sn)
sn.SendNodeAddedEvent()
if err != nil {
wrapped := errors.Join(errors.New("failure while adding new
node, node rejected with error: "), err)
@@ -751,7 +750,7 @@ func (cc *ClusterContext) processAllocations(request
*si.AllocationRequest) {
}
alloc := objects.NewAllocationFromSI(siAlloc)
- if err := partition.addAllocation(alloc); err != nil {
+ if err := partition.AddAllocation(alloc); err != nil {
rejectedAllocs = append(rejectedAllocs,
&si.RejectedAllocation{
AllocationKey: siAlloc.AllocationKey,
ApplicationID: siAlloc.ApplicationID,
@@ -855,16 +854,6 @@ func (cc *ClusterContext)
processAllocationReleases(releases []*si.AllocationRel
}
}
-// Convert the si allocation to a proposal to add to the node
-func (cc *ClusterContext) convertAllocations(allocations []*si.Allocation)
[]*objects.Allocation {
- convert := make([]*objects.Allocation, len(allocations))
- for current, allocation := range allocations {
- convert[current] = objects.NewAllocationFromSI(allocation)
- }
-
- return convert
-}
-
// Create a RM update event to notify RM of new allocations
// Lock free call, all updates occur via events.
func (cc *ClusterContext) notifyRMNewAllocation(rmID string, alloc
*objects.Allocation) {
diff --git a/pkg/scheduler/health_checker_test.go
b/pkg/scheduler/health_checker_test.go
index f0d0865e..c69f5a08 100644
--- a/pkg/scheduler/health_checker_test.go
+++ b/pkg/scheduler/health_checker_test.go
@@ -198,7 +198,7 @@ func TestGetSchedulerHealthStatusContext(t *testing.T) {
SchedulableResource: &si.Resource{
Resources: map[string]*si.Quantity{"memory": {Value:
-10}},
},
- }), []*objects.Allocation{})
+ }))
assert.NilError(t, err, "Unexpected error while adding a new node")
healthInfo = GetSchedulerHealthStatus(schedulerMetrics,
schedulerContext)
assert.Assert(t, !healthInfo.Healthy, "Scheduler should not be healthy")
diff --git a/pkg/scheduler/partition.go b/pkg/scheduler/partition.go
index 25f1e083..3b3eae70 100644
--- a/pkg/scheduler/partition.go
+++ b/pkg/scheduler/partition.go
@@ -545,9 +545,9 @@ func (pc *PartitionContext) GetNode(nodeID string)
*objects.Node {
return pc.nodes.GetNode(nodeID)
}
-// Add the node to the partition and process the allocations that are reported
by the node.
+// Add the node to the partition.
// NOTE: this is a lock free call. It must NOT be called holding the
PartitionContext lock.
-func (pc *PartitionContext) AddNode(node *objects.Node, existingAllocations
[]*objects.Allocation) error {
+func (pc *PartitionContext) AddNode(node *objects.Node) error {
if node == nil {
return fmt.Errorf("cannot add 'nil' node to partition %s",
pc.Name)
}
@@ -560,26 +560,6 @@ func (pc *PartitionContext) AddNode(node *objects.Node,
existingAllocations []*o
if err := pc.addNodeToList(node); err != nil {
return err
}
-
- // Add allocations that exist on the node when added
- if len(existingAllocations) > 0 {
- for current, alloc := range existingAllocations {
- if err := pc.addAllocation(alloc); err != nil {
- // not expecting any inflight replacements on
node recovery
- released, _ := pc.removeNode(node.NodeID)
- log.Log(log.SchedPartition).Info("Failed to add
existing allocations, changes reversed",
- zap.String("nodeID", node.NodeID),
- zap.Int("existingAllocations",
len(existingAllocations)),
- zap.Int("releasedAllocations",
len(released)),
- zap.Int("processingAlloc", current),
- zap.Stringer("allocation", alloc),
- zap.Error(err))
- // update failed metric, active metrics are
tracked in add/remove from list
- metrics.GetSchedulerMetrics().IncFailedNodes()
- return err
- }
- }
- }
return nil
}
@@ -1143,11 +1123,11 @@ func (pc *PartitionContext) GetNodes() []*objects.Node {
return pc.nodes.GetNodes()
}
-// Add an allocation to the partition/node/application/queue during node
registration.
-// Queue max allocation is not checked as the allocation is part of a new node
addition.
+// Add an already bound allocation to the partition/node/application/queue.
+// Queue max allocation is not checked as the allocation came from the RM.
//
// NOTE: this is a lock free call. It must NOT be called holding the
PartitionContext lock.
-func (pc *PartitionContext) addAllocation(alloc *objects.Allocation) error {
+func (pc *PartitionContext) AddAllocation(alloc *objects.Allocation) error {
// cannot do anything with a nil alloc, should only happen if the shim
broke things badly
if alloc == nil {
return nil
diff --git a/pkg/scheduler/partition_test.go b/pkg/scheduler/partition_test.go
index db346432..d1357dc8 100644
--- a/pkg/scheduler/partition_test.go
+++ b/pkg/scheduler/partition_test.go
@@ -49,7 +49,7 @@ func setupUGM() {
}
func setupNode(t *testing.T, nodeID string, partition *PartitionContext,
nodeRes *resources.Resource) *objects.Node {
- err := partition.AddNode(newNodeMaxResource(nodeID, nodeRes), nil)
+ err := partition.AddNode(newNodeMaxResource(nodeID, nodeRes))
assert.NilError(t, err, "test "+nodeID+" add failed unexpected")
node := partition.GetNode(nodeID)
if node == nil {
@@ -192,7 +192,7 @@ func TestNewWithPlacement(t *testing.T) {
func TestAddNode(t *testing.T) {
partition, err := newBasePartition()
assert.NilError(t, err, "test partition create failed with error")
- err = partition.AddNode(nil, nil)
+ err = partition.AddNode(nil)
if err == nil {
t.Fatal("nil node add did not return error")
}
@@ -201,7 +201,7 @@ func TestAddNode(t *testing.T) {
// stop the partition node should be rejected
partition.markPartitionForRemoval()
assert.Assert(t, partition.isDraining(), "partition should have been
marked as draining")
- err = partition.AddNode(node, nil)
+ err = partition.AddNode(node)
if err == nil {
t.Error("test node add to draining partition should have
failed")
}
@@ -209,72 +209,25 @@ func TestAddNode(t *testing.T) {
// reset the state (hard no checks)
partition.stateMachine.SetState(objects.Active.String())
- err = partition.AddNode(node, nil)
+ err = partition.AddNode(node)
assert.NilError(t, err, "test node add failed unexpected")
assert.Equal(t, partition.nodes.GetNodeCount(), 1, "node list not
correct")
// add the same node nothing changes
- err = partition.AddNode(node, nil)
+ err = partition.AddNode(node)
if err == nil {
t.Fatal("add same test node worked unexpected")
}
assert.Equal(t, partition.nodes.GetNodeCount(), 1, "node list not
correct")
- err = partition.AddNode(newNodeMaxResource("test2",
resources.NewResource()), nil)
+ err = partition.AddNode(newNodeMaxResource("test2",
resources.NewResource()))
assert.NilError(t, err, "test node2 add failed unexpected")
assert.Equal(t, partition.nodes.GetNodeCount(), 2, "node list not
correct")
}
-func TestAddNodeWithAllocations(t *testing.T) {
- setupUGM()
- partition, err := newBasePartition()
- assert.NilError(t, err, "partition create failed")
-
- q := partition.GetQueue(defQueue)
- if q == nil {
- t.Fatal("expected default queue not found")
- }
-
- // add a new app
- app := newApplication(appID1, "default", defQueue)
- appRes :=
resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 1000})
- err = partition.AddApplication(app)
- assert.NilError(t, err, "add application to partition should not have
failed")
-
- // add a node with allocations
- nodeRes :=
resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 10000})
- node := newNodeMaxResource(nodeID1, nodeRes)
-
- // fail with an unknown app
- ask := newAllocationAsk("alloc-1", "unknown", appRes)
- alloc := markAllocated(nodeID1, ask)
- allocs := []*objects.Allocation{alloc}
- err = partition.AddNode(node, allocs)
- if err == nil {
- t.Errorf("add node to partition should have failed (app
missing)")
- }
- assert.Equal(t, partition.nodes.GetNodeCount(), 0, "error returned but
node still added to the partition (app)")
-
- // fail with a broken alloc
- ask = newAllocationAsk("alloc-1", appID1, appRes)
- alloc = markAllocated(nodeID1, ask)
- allocs = []*objects.Allocation{alloc}
- // add a node this must work
- err = partition.AddNode(node, allocs)
- // check the partition
- assert.NilError(t, err, "add node to partition should not have failed")
- assert.Equal(t, partition.nodes.GetNodeCount(), 1, "no error returned
but node not added to the partition")
- assert.Assert(t, resources.Equals(nodeRes,
partition.GetTotalPartitionResource()), "add node to partition did not update
total resources expected %v got %d", nodeRes,
partition.GetTotalPartitionResource())
- assert.Equal(t, partition.GetTotalAllocationCount(), 1, "add node to
partition did not add allocation")
-
- // check the queue usage
- assert.Assert(t, resources.Equals(q.GetAllocatedResource(), appRes),
"add node to partition did not update queue as expected")
- assertLimits(t, getTestUserGroup(), appRes)
-}
-
func TestRemoveNode(t *testing.T) {
setupUGM()
partition, err := newBasePartition()
assert.NilError(t, err, "test partition create failed with error")
- err = partition.AddNode(newNodeMaxResource("test",
resources.NewResource()), nil)
+ err = partition.AddNode(newNodeMaxResource("test",
resources.NewResource()))
assert.NilError(t, err, "test node add failed unexpected")
assert.Equal(t, 1, partition.nodes.GetNodeCount(), "node list not
correct")
@@ -305,9 +258,10 @@ func TestRemoveNodeWithAllocations(t *testing.T) {
ask := newAllocationAsk("alloc-1", appID1, appRes)
alloc := markAllocated(nodeID1, ask)
allocAllocationKey := alloc.GetAllocationKey()
- allocs := []*objects.Allocation{alloc}
- err = partition.AddNode(node, allocs)
+ err = partition.AddNode(node)
assert.NilError(t, err, "add node to partition should not have failed")
+ err = partition.AddAllocation(alloc)
+ assert.NilError(t, err)
// get what was allocated
allocated := node.GetAllAllocations()
assert.Equal(t, 1, len(allocated), "allocation not added correctly")
@@ -350,9 +304,10 @@ func TestRemoveNodeWithPlaceholders(t *testing.T) {
appRes :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 1})
ask := newAllocationAskTG("placeholder", appID1, taskGroup, appRes,
true)
ph := markAllocated(nodeID1, ask)
- allocs := []*objects.Allocation{ph}
- err = partition.AddNode(node1, allocs)
+ err = partition.AddNode(node1)
assert.NilError(t, err, "add node1 to partition should not have failed")
+ err = partition.AddAllocation(ph)
+ assert.NilError(t, err)
// get what was allocated
allocated := node1.GetAllAllocations()
assert.Equal(t, 1, len(allocated), "allocation not added correctly to
node1 expected 1 got: %v", allocated)
@@ -375,7 +330,7 @@ func TestRemoveNodeWithPlaceholders(t *testing.T) {
// double link as if the replacement is ongoing
ph.SetRelease(alloc)
- allocs = app.GetAllAllocations()
+ allocs := app.GetAllAllocations()
assert.Equal(t, len(allocs), 1, "expected one allocation for the app
(placeholder)")
assertLimits(t, getTestUserGroup(), appRes)
@@ -399,7 +354,7 @@ func TestCalculateNodesResourceUsage(t *testing.T) {
assert.NilError(t, err, "partition create failed")
oldCapacity :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 100})
node := newNodeMaxResource(nodeID1, oldCapacity)
- err = partition.AddNode(node, nil)
+ err = partition.AddNode(node)
assert.NilError(t, err)
occupiedResources :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 50})
@@ -456,12 +411,13 @@ func TestPlaceholderDataWithPlaceholderPreemption(t
*testing.T) {
ask := newAllocationAskAll("ask-1", appID1, taskGroup, appRes, 1, false)
alloc := markAllocated(nodeID1, ask)
- allocs := []*objects.Allocation{alloc}
node1 := newNodeMaxResource(nodeID1, newRes)
- err = partition.AddNode(node1, allocs)
+ err = partition.AddNode(node1)
assert.NilError(t, err, "add node1 to partition should not have failed")
assert.Equal(t, 1, partition.nodes.GetNodeCount(), "node list not
correct")
+ err = partition.AddAllocation(alloc)
+ assert.NilError(t, err)
// get what was allocated
allocated := node1.GetAllAllocations()
@@ -469,7 +425,7 @@ func TestPlaceholderDataWithPlaceholderPreemption(t
*testing.T) {
assert.Assert(t, resources.Equals(node1.GetAllocatedResource(),
appRes), "allocation not added correctly to node1")
node2 := newNodeMaxResource(nodeID2, newRes)
- err = partition.AddNode(node2, nil)
+ err = partition.AddNode(node2)
assert.NilError(t, err, "test node add failed unexpected")
assert.Equal(t, 2, partition.nodes.GetNodeCount(), "node list not
correct")
assert.Assert(t,
resources.Equals(partition.GetQueue(defQueue).GetAllocatedResource(), appRes),
"Queue allocated resource is not correct")
@@ -582,12 +538,12 @@ func TestPlaceholderDataWithNodeRemoval(t *testing.T) {
// add a node with allocation: must have the correct app1 added already
ask := newAllocationAskAll("ask-1", appID1, taskGroup, appRes, 1, false)
alloc := markAllocated(nodeID1, ask)
- allocs := []*objects.Allocation{alloc}
-
node1 := newNodeMaxResource(nodeID1, newRes)
- err = partition.AddNode(node1, allocs)
+ err = partition.AddNode(node1)
assert.NilError(t, err, "add node1 to partition should not have failed")
assert.Equal(t, 1, partition.nodes.GetNodeCount(), "node list not
correct")
+ err = partition.AddAllocation(alloc)
+ assert.NilError(t, err)
// get what was allocated
allocated := node1.GetAllAllocations()
@@ -595,7 +551,7 @@ func TestPlaceholderDataWithNodeRemoval(t *testing.T) {
assert.Assert(t, resources.Equals(node1.GetAllocatedResource(),
appRes), "allocation not added correctly to node1")
node2 := newNodeMaxResource(nodeID2, newRes)
- err = partition.AddNode(node2, nil)
+ err = partition.AddNode(node2)
assert.NilError(t, err, "test node add failed unexpected")
assert.Equal(t, 2, partition.nodes.GetNodeCount(), "node list not
correct")
assert.Assert(t,
resources.Equals(partition.GetQueue(defQueue).GetAllocatedResource(), appRes),
"Queue allocated resource is not correct")
@@ -665,12 +621,13 @@ func TestPlaceholderDataWithRemoval(t *testing.T) {
// add a node with allocation: must have the correct app1 added already
ask := newAllocationAskAll("ask-1", appID1, taskGroup, appRes, 1, false)
alloc := markAllocated(nodeID1, ask)
- allocs := []*objects.Allocation{alloc}
node1 := newNodeMaxResource(nodeID1, newRes)
- err = partition.AddNode(node1, allocs)
+ err = partition.AddNode(node1)
assert.NilError(t, err, "add node1 to partition should not have failed")
assert.Equal(t, 1, partition.nodes.GetNodeCount(), "node list not
correct")
+ err = partition.AddAllocation(alloc)
+ assert.NilError(t, err)
// get what was allocated
allocated := node1.GetAllAllocations()
@@ -678,7 +635,7 @@ func TestPlaceholderDataWithRemoval(t *testing.T) {
assert.Assert(t, resources.Equals(node1.GetAllocatedResource(),
appRes), "allocation not added correctly to node1")
node2 := newNodeMaxResource(nodeID2, newRes)
- err = partition.AddNode(node2, nil)
+ err = partition.AddNode(node2)
assert.NilError(t, err, "test node add failed unexpected")
assert.Equal(t, 2, partition.nodes.GetNodeCount(), "node list not
correct")
assert.Assert(t,
resources.Equals(partition.GetQueue(defQueue).GetAllocatedResource(), appRes),
"Queue allocated resource is not correct")
@@ -750,9 +707,10 @@ func TestRemoveNodeWithReplacement(t *testing.T) {
appRes :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 1})
ask := newAllocationAskAll("placeholder", appID1, taskGroup, appRes, 1,
true)
ph := markAllocated(nodeID1, ask)
- allocs := []*objects.Allocation{ph}
- err = partition.AddNode(node1, allocs)
+ err = partition.AddNode(node1)
assert.NilError(t, err, "add node1 to partition should not have failed")
+ err = partition.AddAllocation(ph)
+ assert.NilError(t, err)
// get what was allocated
allocated := node1.GetAllAllocations()
assert.Equal(t, 1, len(allocated), "allocation not added correctly to
node1")
@@ -782,7 +740,7 @@ func TestRemoveNodeWithReplacement(t *testing.T) {
// double link as if the replacement is ongoing
ph.SetRelease(alloc)
- allocs = app.GetAllAllocations()
+ allocs := app.GetAllAllocations()
assert.Equal(t, len(allocs), 1, "expected one allocation for the app
(placeholder)")
assertLimits(t, getTestUserGroup(), appRes)
@@ -820,9 +778,10 @@ func TestRemoveNodeWithReal(t *testing.T) {
appRes :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 1})
ask := newAllocationAskAll("placeholder", appID1, taskGroup, appRes, 1,
true)
ph := markAllocated(nodeID1, ask)
- allocs := []*objects.Allocation{ph}
- err = partition.AddNode(node1, allocs)
+ err = partition.AddNode(node1)
assert.NilError(t, err, "add node1 to partition should not have failed")
+ err = partition.AddAllocation(ph)
+ assert.NilError(t, err)
// get what was allocated
allocated := node1.GetAllAllocations()
assert.Equal(t, 1, len(allocated), "allocation not added correctly to
node1")
@@ -852,7 +811,7 @@ func TestRemoveNodeWithReal(t *testing.T) {
// double link as if the replacement is ongoing
ph.SetRelease(alloc)
- allocs = app.GetAllAllocations()
+ allocs := app.GetAllAllocations()
assert.Equal(t, len(allocs), 1, "expected one allocation for the app
(placeholder)")
// remove the node with the real allocation
@@ -1075,7 +1034,7 @@ func TestRemoveApp(t *testing.T) {
appRes :=
resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 1000})
ask := newAllocationAsk("alloc-nr", appNotRemoved, appRes)
alloc := markAllocated(nodeID1, ask)
- err = partition.addAllocation(alloc)
+ err = partition.AddAllocation(alloc)
assert.NilError(t, err, "add allocation to partition should not have
failed")
assertLimits(t, getTestUserGroup(), appRes)
@@ -1102,7 +1061,7 @@ func TestRemoveApp(t *testing.T) {
ask = newAllocationAsk("alloc-1", appID1, appRes)
alloc = markAllocated(nodeID1, ask)
- err = partition.addAllocation(alloc)
+ err = partition.AddAllocation(alloc)
assert.NilError(t, err, "add allocation to partition should not have
failed")
assertLimits(t, getTestUserGroup(), resources.Multiply(appRes, 2))
@@ -1136,14 +1095,14 @@ func TestRemoveAppAllocs(t *testing.T) {
appRes :=
resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 1000})
ask := newAllocationAsk("alloc-nr", appNotRemoved, appRes)
alloc := markAllocated(nodeID1, ask)
- err = partition.addAllocation(alloc)
+ err = partition.AddAllocation(alloc)
assert.NilError(t, err, "add allocation to partition should not have
failed")
assertLimits(t, getTestUserGroup(), appRes)
ask = newAllocationAsk("alloc-1", appNotRemoved, appRes)
allocationKey := "alloc-1"
alloc = markAllocated(nodeID1, ask)
- err = partition.addAllocation(alloc)
+ err = partition.AddAllocation(alloc)
assert.NilError(t, err, "add allocation to partition should not have
failed")
assertLimits(t, getTestUserGroup(), resources.Multiply(appRes, 2))
release := &si.AllocationRelease{
@@ -1197,11 +1156,11 @@ func TestRemoveAllPlaceholderAllocs(t *testing.T) {
assert.NilError(t, err, "failed to create resource")
phAsk1 := newAllocationAskTG(phID, appID1, taskGroup, res, true)
phAlloc1 := markAllocated(nodeID1, phAsk1)
- err = partition.addAllocation(phAlloc1)
+ err = partition.AddAllocation(phAlloc1)
assert.NilError(t, err, "could not add allocation to partition")
phAsk2 := newAllocationAskTG(phID2, appID1, taskGroup, res, true)
phAlloc2 := markAllocated(nodeID1, phAsk2)
- err = partition.addAllocation(phAlloc2)
+ err = partition.AddAllocation(phAlloc2)
assert.NilError(t, err, "could not add allocation to partition")
partition.removeAllocation(&si.AllocationRelease{
PartitionName: "default",
@@ -2551,7 +2510,7 @@ func TestScheduleRemoveReservedAsk(t *testing.T) {
// add a node
node := newNodeMaxResource("node-3", res)
- err = partition.AddNode(node, nil)
+ err = partition.AddNode(node)
assert.NilError(t, err, "failed to add node node-3 to the partition")
// Try to allocate one of the reservation. We go directly to the root
queue not using the partition otherwise
// we confirm before we get back in the test code and cannot remove the
ask
@@ -2635,7 +2594,7 @@ func TestUpdateRootQueue(t *testing.T) {
// add new node, node 3 with 'memory' resource type
res1, err1 := resources.NewResourceFromConf(map[string]string{"vcore":
"20", "memory": "50"})
assert.NilError(t, err1, "resource creation failed")
- err = partition.AddNode(newNodeMaxResource("node-3", res1), nil)
+ err = partition.AddNode(newNodeMaxResource("node-3", res1))
assert.NilError(t, err, "test node3 add failed unexpected")
// root max resource gets updated with 'memory' resource type
@@ -2778,7 +2737,7 @@ func TestUpdateNode(t *testing.T) {
newRes, err :=
resources.NewResourceFromConf(map[string]string{"memory": "400M", "vcore":
"30"})
assert.NilError(t, err, "failed to create resource")
- err = partition.AddNode(newNodeMaxResource("test", newRes), nil)
+ err = partition.AddNode(newNodeMaxResource("test", newRes))
assert.NilError(t, err, "test node add failed unexpected")
assert.Equal(t, 1, partition.nodes.GetNodeCount(), "node list not
correct")
@@ -3862,9 +3821,9 @@ func TestUserHeadroom(t *testing.T) {
var res *resources.Resource
res, err = resources.NewResourceFromConf(map[string]string{"memory":
"10", "vcores": "10"})
assert.NilError(t, err, "failed to create basic resource")
- err = partition.AddNode(newNodeMaxResource("node-1", res), nil)
+ err = partition.AddNode(newNodeMaxResource("node-1", res))
assert.NilError(t, err, "test node1 add failed unexpected")
- err = partition.AddNode(newNodeMaxResource("node-2", res), nil)
+ err = partition.AddNode(newNodeMaxResource("node-2", res))
assert.NilError(t, err, "test node2 add failed unexpected")
app1 := newApplication(appID1, "default", "root.parent.sub-leaf")
@@ -4219,7 +4178,7 @@ func TestLimitMaxApplications(t *testing.T) {
// add node1
nodeRes, err :=
resources.NewResourceFromConf(map[string]string{"memory": "10", "vcores": "10"})
assert.NilError(t, err, "failed to create basic
resource")
- err = partition.AddNode(newNodeMaxResource("node-1",
nodeRes), nil)
+ err = partition.AddNode(newNodeMaxResource("node-1",
nodeRes))
assert.NilError(t, err, "test node1 add failed
unexpected")
resMap := map[string]string{"memory": "2", "vcores":
"2"}
@@ -4375,7 +4334,7 @@ func TestLimitMaxApplicationsForReservedAllocation(t
*testing.T) {
nodeRes, err :=
resources.NewResourceFromConf(map[string]string{"memory": "10", "vcores": "10"})
assert.NilError(t, err, "failed to create basic
resource")
node := newNodeMaxResource("node-1", nodeRes)
- err = partition.AddNode(node, nil)
+ err = partition.AddNode(node)
assert.NilError(t, err, "test node1 add failed
unexpected")
resMap := map[string]string{"memory": "2", "vcores":
"2"}
@@ -4502,8 +4461,9 @@ func
TestPlaceholderAllocationAndReplacementAfterRecovery(t *testing.T) {
appRes :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 1})
phAsk := newAllocationAskTG("placeholder", appID1, taskGroup, appRes,
true)
ph := markAllocated(nodeID1, phAsk)
- allocs := []*objects.Allocation{ph}
- err = partition.AddNode(node1, allocs)
+ err = partition.AddNode(node1)
+ assert.NilError(t, err)
+ err = partition.AddAllocation(ph)
assert.NilError(t, err)
// add a placeholder ask with a different taskgroup
diff --git a/pkg/scheduler/tests/mock_rm_callback_test.go
b/pkg/scheduler/tests/mock_rm_callback_test.go
index eef3ad3f..3bc8c63d 100644
--- a/pkg/scheduler/tests/mock_rm_callback_test.go
+++ b/pkg/scheduler/tests/mock_rm_callback_test.go
@@ -174,15 +174,6 @@ func (m *mockRMCallback) waitForMinAcceptedNodes(tb
testing.TB, minNumNode int,
}
}
-func (m *mockRMCallback) waitForRejectedNode(t *testing.T, nodeID string,
timeoutMs int) {
- err := common.WaitForCondition(10*time.Millisecond,
time.Duration(timeoutMs)*time.Millisecond, func() bool {
- m.RLock()
- defer m.RUnlock()
- return m.rejectedNodes[nodeID]
- })
- assert.NilError(t, err, "Failed to wait for node state to become
rejected: %s, called from: %s", nodeID, caller())
-}
-
func (m *mockRMCallback) waitForAllocations(t *testing.T, nAlloc int,
timeoutMs int) {
var allocLen int
err := common.WaitForCondition(10*time.Millisecond,
time.Duration(timeoutMs)*time.Millisecond, func() bool {
diff --git a/pkg/scheduler/tests/recovery_test.go
b/pkg/scheduler/tests/recovery_test.go
index 09c8d4f4..e1393999 100644
--- a/pkg/scheduler/tests/recovery_test.go
+++ b/pkg/scheduler/tests/recovery_test.go
@@ -26,6 +26,7 @@ import (
"github.com/apache/yunikorn-core/pkg/common/resources"
"github.com/apache/yunikorn-core/pkg/entrypoint"
+ "github.com/apache/yunikorn-core/pkg/scheduler"
"github.com/apache/yunikorn-core/pkg/scheduler/objects"
"github.com/apache/yunikorn-scheduler-interface/lib/go/common"
"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
@@ -256,8 +257,7 @@ func TestSchedulerRecovery(t *testing.T) {
"vcore": {Value: 20},
},
},
- Action: si.NodeInfo_CREATE,
- ExistingAllocations:
mockRM.nodeAllocations["node-1:1234"],
+ Action: si.NodeInfo_CREATE,
},
{
NodeID: "node-2:1234",
@@ -268,8 +268,7 @@ func TestSchedulerRecovery(t *testing.T) {
"vcore": {Value: 20},
},
},
- Action: si.NodeInfo_CREATE,
- ExistingAllocations:
mockRM.nodeAllocations["node-2:1234"],
+ Action: si.NodeInfo_CREATE,
},
},
RmID: "rm:123",
@@ -280,6 +279,15 @@ func TestSchedulerRecovery(t *testing.T) {
// verify partition info
part = ms.scheduler.GetClusterContext().GetPartition(ms.partitionName)
+
+ // add allocs to partition
+ node1Allocations := mockRM.nodeAllocations["node-1:1234"]
+ err = registerAllocations(part, node1Allocations)
+ assert.NilError(t, err)
+ node2Allocations := mockRM.nodeAllocations["node-2:1234"]
+ err = registerAllocations(part, node2Allocations)
+ assert.NilError(t, err)
+
// verify apps in this partition
assert.Equal(t, 1, len(part.GetApplications()))
assert.Equal(t, appID1, part.GetApplications()[0].ApplicationID)
@@ -289,8 +297,6 @@ func TestSchedulerRecovery(t *testing.T) {
// verify nodes
assert.Equal(t, 2, part.GetTotalNodeCount(), "incorrect recovered node
count")
- node1Allocations := mockRM.nodeAllocations["node-1:1234"]
- node2Allocations := mockRM.nodeAllocations["node-2:1234"]
assert.Equal(t, len(node1Allocations),
len(part.GetNode("node-1:1234").GetAllAllocations()), "allocations on node-1
not as expected")
assert.Equal(t, len(node2Allocations),
len(part.GetNode("node-2:1234").GetAllAllocations()), "allocations on node-1
not as expected")
@@ -443,14 +449,17 @@ func TestSchedulerRecovery2Allocations(t *testing.T) {
"vcore": {Value: 20},
},
},
- Action: si.NodeInfo_CREATE,
- ExistingAllocations:
mockRM.nodeAllocations["node-1:1234"],
+ Action: si.NodeInfo_CREATE,
},
},
RmID: "rm:123",
})
assert.NilError(t, err, "NodeRequest nodes and app for recovery failed")
ms.mockRM.waitForAcceptedNode(t, "node-1:1234", 1000)
+ allocs := mockRM.nodeAllocations["node-1:1234"]
+ err = ms.proxy.UpdateAllocation(&si.AllocationRequest{Allocations:
allocs, RmID: "rm:123"})
+ assert.NilError(t, err, "failed to update allocations")
+ ms.mockRM.waitForAllocations(t, len(allocs), 1000)
recoveredApp := ms.getApplication(appID1)
// verify app state
assert.Equal(t, recoveredApp.CurrentState(), objects.Running.String())
@@ -482,24 +491,6 @@ func TestSchedulerRecoveryWithoutAppInfo(t *testing.T) {
},
},
Action: si.NodeInfo_CREATE,
- ExistingAllocations: []*si.Allocation{
- {
- AllocationKey:
"allocation-key-01",
- ApplicationID: "app-01",
- PartitionName: "default",
- NodeID: "node-1:1234",
- ResourcePerAlloc: &si.Resource{
- Resources:
map[string]*si.Quantity{
- common.Memory: {
- Value:
1024,
- },
- common.CPU: {
- Value:
1,
- },
- },
- },
- },
- },
},
{
NodeID: "node-2:1234",
@@ -518,13 +509,36 @@ func TestSchedulerRecoveryWithoutAppInfo(t *testing.T) {
assert.NilError(t, err, "NodeRequest nodes and apps failed")
// waiting for recovery
- // node-1 should be rejected as some of allocations cannot be recovered
- ms.mockRM.waitForRejectedNode(t, "node-1:1234", 1000)
+ ms.mockRM.waitForAcceptedNode(t, "node-1:1234", 1000)
ms.mockRM.waitForAcceptedNode(t, "node-2:1234", 1000)
- // verify partition resources
part := ms.scheduler.GetClusterContext().GetPartition("[rm:123]default")
- assert.Equal(t, part.GetTotalNodeCount(), 1)
+ err = ms.proxy.UpdateAllocation(&si.AllocationRequest{
+ Allocations: []*si.Allocation{
+ {
+ AllocationKey: "allocation-key-01",
+ ApplicationID: "app-01",
+ PartitionName: "default",
+ NodeID: "node-1:1234",
+ ResourcePerAlloc: &si.Resource{
+ Resources: map[string]*si.Quantity{
+ common.Memory: {
+ Value: 1024,
+ },
+ common.CPU: {
+ Value: 1,
+ },
+ },
+ },
+ },
+ },
+ RmID: "rm:123",
+ })
+
+ assert.NilError(t, err)
+
+ // verify partition resources
+ assert.Equal(t, part.GetTotalNodeCount(), 2)
assert.Equal(t, part.GetTotalAllocationCount(), 0)
assert.Equal(t,
part.GetNode("node-2:1234").GetAllocatedResource().Resources[common.Memory],
resources.Quantity(0))
@@ -549,21 +563,27 @@ func TestSchedulerRecoveryWithoutAppInfo(t *testing.T) {
},
},
Action: si.NodeInfo_CREATE,
- ExistingAllocations: []*si.Allocation{
- {
- AllocationKey:
"allocation-key-01",
- ApplicationID: "app-01",
- PartitionName: "default",
- NodeID: "node-1:1234",
- ResourcePerAlloc: &si.Resource{
- Resources:
map[string]*si.Quantity{
- common.Memory: {
- Value:
100,
- },
- common.CPU: {
- Value:
1,
- },
- },
+ },
+ },
+ RmID: "rm:123",
+ })
+ assert.NilError(t, err, "NodeRequest re-register nodes and app failed")
+ ms.mockRM.waitForAcceptedNode(t, "node-1:1234", 1000)
+
+ err = ms.proxy.UpdateAllocation(&si.AllocationRequest{
+ Allocations: []*si.Allocation{
+ {
+ AllocationKey: "allocation-key-01",
+ ApplicationID: "app-01",
+ PartitionName: "default",
+ NodeID: "node-1:1234",
+ ResourcePerAlloc: &si.Resource{
+ Resources: map[string]*si.Quantity{
+ common.Memory: {
+ Value: 100,
+ },
+ common.CPU: {
+ Value: 1,
},
},
},
@@ -571,8 +591,8 @@ func TestSchedulerRecoveryWithoutAppInfo(t *testing.T) {
},
RmID: "rm:123",
})
- assert.NilError(t, err, "NodeRequest re-register nodes and app failed")
- ms.mockRM.waitForAcceptedNode(t, "node-1:1234", 1000)
+ assert.NilError(t, err)
+ ms.mockRM.waitForAllocations(t, 1, 1000)
assert.Equal(t, part.GetTotalNodeCount(), 2)
assert.Equal(t, part.GetTotalAllocationCount(), 1)
@@ -921,8 +941,7 @@ partitions:
"vcore": {Value: 20},
},
},
- Action: si.NodeInfo_CREATE,
- ExistingAllocations: toRecover["node-1:1234"],
+ Action: si.NodeInfo_CREATE,
},
{
NodeID: "node-2:1234",
@@ -933,8 +952,7 @@ partitions:
"vcore": {Value: 20},
},
},
- Action: si.NodeInfo_CREATE,
- ExistingAllocations: toRecover["node-2:1234"],
+ Action: si.NodeInfo_CREATE,
},
},
RmID: "rm:123",
@@ -946,6 +964,12 @@ partitions:
ms.mockRM.waitForAcceptedNode(t, "node-1:1234", 1000)
ms.mockRM.waitForAcceptedNode(t, "node-2:1234", 1000)
ms.mockRM.waitForAcceptedApplication(t, appID1, 1000)
+
+ err = registerAllocations(part, toRecover["node-1:1234"])
+ assert.NilError(t, err)
+ err = registerAllocations(part, toRecover["node-2:1234"])
+ assert.NilError(t, err)
+
// now the queue should have been created under root.app-1-namespace
assert.Equal(t, len(rootQ.GetCopyOfChildren()), 1)
appQueue = part.GetQueue("root.app-1-namespace")
@@ -1005,8 +1029,7 @@ func TestPlaceholderRecovery(t *testing.T) {
//nolint:funlen
"vcore": {Value: 20},
},
},
- Action: si.NodeInfo_CREATE,
- ExistingAllocations: existingAllocations,
+ Action: si.NodeInfo_CREATE,
},
},
RmID: "rm:123",
@@ -1014,6 +1037,14 @@ func TestPlaceholderRecovery(t *testing.T) {
//nolint:funlen
assert.NilError(t, err, "NodeRequest nodes and app for recovery failed")
ms.mockRM.waitForAcceptedNode(t, "node-1:1234", 1000)
+ // Add existing allocations
+ err = ms.proxy.UpdateAllocation(&si.AllocationRequest{
+ Allocations: existingAllocations,
+ RmID: "rm:123",
+ })
+ assert.NilError(t, err, "AllocationRequest failed for existing
allocations")
+ ms.mockRM.waitForAllocations(t, len(existingAllocations), 1000)
+
// Add a new placeholder ask with a different task group
err = ms.proxy.UpdateAllocation(&si.AllocationRequest{
Asks: []*si.AllocationAsk{
@@ -1033,7 +1064,7 @@ func TestPlaceholderRecovery(t *testing.T) {
//nolint:funlen
RmID: "rm:123",
})
assert.NilError(t, err, "AllocationRequest failed for placeholder ask")
- ms.mockRM.waitForAllocations(t, 1, 1000)
+ ms.mockRM.waitForAllocations(t, len(existingAllocations)+1, 1000)
// Add two real asks
err = ms.proxy.UpdateAllocation(&si.AllocationRequest{
@@ -1112,3 +1143,13 @@ func TestPlaceholderRecovery(t *testing.T) {
//nolint:funlen
ms.mockRM.waitForApplicationState(t, appID1, "Completing", 1000)
}
+
+func registerAllocations(partition *scheduler.PartitionContext, allocs
[]*si.Allocation) error {
+ for _, alloc := range allocs {
+ err :=
partition.AddAllocation(objects.NewAllocationFromSI(alloc))
+ if err != nil {
+ return err
+ }
+ }
+ return nil
+}
diff --git a/pkg/scheduler/utilities_test.go b/pkg/scheduler/utilities_test.go
index a97bd139..96e6d8e5 100644
--- a/pkg/scheduler/utilities_test.go
+++ b/pkg/scheduler/utilities_test.go
@@ -619,9 +619,9 @@ func createQueuesNodes(t *testing.T) *PartitionContext {
var res *resources.Resource
res, err = resources.NewResourceFromConf(map[string]string{"vcore":
"10"})
assert.NilError(t, err, "failed to create basic resource")
- err = partition.AddNode(newNodeMaxResource("node-1", res), nil)
+ err = partition.AddNode(newNodeMaxResource("node-1", res))
assert.NilError(t, err, "test node1 add failed unexpected")
- err = partition.AddNode(newNodeMaxResource("node-2", res), nil)
+ err = partition.AddNode(newNodeMaxResource("node-2", res))
assert.NilError(t, err, "test node2 add failed unexpected")
return partition
}
@@ -639,9 +639,9 @@ func createPreemptionQueuesNodes(t *testing.T)
*PartitionContext {
assert.NilError(t, err, "test partition create failed with error")
res, err := resources.NewResourceFromConf(map[string]string{"vcore":
"10"})
assert.NilError(t, err, "failed to create basic resource")
- err = partition.AddNode(newNodeMaxResource("node-1", res), nil)
+ err = partition.AddNode(newNodeMaxResource("node-1", res))
assert.NilError(t, err, "test node1 add failed unexpected")
- err = partition.AddNode(newNodeMaxResource("node-2", res), nil)
+ err = partition.AddNode(newNodeMaxResource("node-2", res))
assert.NilError(t, err, "test node2 add failed unexpected")
return partition
}
diff --git a/pkg/webservice/handlers_test.go b/pkg/webservice/handlers_test.go
index 9cca0dc6..b7c2bea4 100644
--- a/pkg/webservice/handlers_test.go
+++ b/pkg/webservice/handlers_test.go
@@ -623,9 +623,12 @@ func TestGetClusterUtilJSON(t *testing.T) {
ask2 := objects.NewAllocationAsk("alloc-2", appID, resAlloc2)
alloc1 := markAllocated(nodeID, ask1)
alloc2 := markAllocated(nodeID, ask2)
- allocs := []*objects.Allocation{alloc1, alloc2}
- err = partition.AddNode(node1, allocs)
+ err = partition.AddNode(node1)
assert.NilError(t, err, "add node to partition should not have failed")
+ err = partition.AddAllocation(alloc1)
+ assert.NilError(t, err, "failed to add alloc1")
+ err = partition.AddAllocation(alloc2)
+ assert.NilError(t, err, "failed to add alloc2")
// set expected result
utilMem := &dao.ClusterUtilDAOInfo{
@@ -684,12 +687,16 @@ func TestGetNodesUtilJSON(t *testing.T) {
ask1 := objects.NewAllocationAsk("alloc-1", app.ApplicationID,
resAlloc1)
ask2 := objects.NewAllocationAsk("alloc-2", app.ApplicationID,
resAlloc2)
allocs := []*objects.Allocation{markAllocated(node1.NodeID, ask1)}
- err = partition.AddNode(node1, allocs)
+ err = partition.AddNode(node1)
assert.NilError(t, err, "add node to partition should not have failed")
+ err = partition.AddAllocation(allocs[0])
+ assert.NilError(t, err, "add alloc-1 should not have failed")
allocs = []*objects.Allocation{markAllocated(node2.NodeID, ask2)}
- err = partition.AddNode(node2, allocs)
+ err = partition.AddNode(node2)
assert.NilError(t, err, "add node to partition should not have failed")
- err = partition.AddNode(node3, nil)
+ err = partition.AddAllocation(allocs[0])
+ assert.NilError(t, err, "add alloc-2 should not have failed")
+ err = partition.AddNode(node3)
assert.NilError(t, err, "add node to partition should not have failed")
// two nodes advertise memory: must show up in the list
@@ -800,7 +807,7 @@ func TestGetNodeUtilisation(t *testing.T) {
func addNode(t *testing.T, partition *scheduler.PartitionContext, nodeId
string, resource *resources.Resource) *objects.Node {
nodeRes := resource.ToProto()
node := objects.NewNode(&si.NodeInfo{NodeID: nodeId,
SchedulableResource: nodeRes})
- err := partition.AddNode(node, nil)
+ err := partition.AddNode(node)
assert.NilError(t, err, "adding node to partition should not fail")
return node
}
@@ -1009,11 +1016,15 @@ func TestPartitions(t *testing.T) {
ask1 := objects.NewAllocationAsk("alloc-1", app5.ApplicationID,
resAlloc1)
ask2 := objects.NewAllocationAsk("alloc-2", app2.ApplicationID,
resAlloc2)
allocs := []*objects.Allocation{markAllocated(node1ID, ask1)}
- err = defaultPartition.AddNode(node1, allocs)
+ err = defaultPartition.AddNode(node1)
assert.NilError(t, err, "add node to partition should not have failed")
+ err = defaultPartition.AddAllocation(allocs[0])
+ assert.NilError(t, err, "add alloc-1 should not have failed")
allocs = []*objects.Allocation{markAllocated(node2ID, ask2)}
- err = defaultPartition.AddNode(node2, allocs)
+ err = defaultPartition.AddNode(node2)
assert.NilError(t, err, "add node to partition should not have failed")
+ err = defaultPartition.AddAllocation(allocs[0])
+ assert.NilError(t, err, "add alloc-2 should not have failed")
req, err = http.NewRequest("GET", "/ws/v1/partitions",
strings.NewReader(""))
assert.NilError(t, err, "App Handler request failed")
@@ -1285,11 +1296,16 @@ func TestGetPartitionNodes(t *testing.T) {
ask1 := objects.NewAllocationAsk("alloc-1", appID, resAlloc1)
ask2 := objects.NewAllocationAsk("alloc-2", appID, resAlloc2)
allocs := []*objects.Allocation{markAllocated(node1ID, ask1)}
- err = partition.AddNode(node1, allocs)
+ err = partition.AddNode(node1)
assert.NilError(t, err, "add node to partition should not have failed")
+ err = partition.AddAllocation(allocs[0])
+ assert.NilError(t, err, "add alloc-1 should not have failed")
+
allocs = []*objects.Allocation{markAllocated(node2ID, ask2)}
- err = partition.AddNode(node2, allocs)
+ err = partition.AddNode(node2)
assert.NilError(t, err, "add node to partition should not have failed")
+ err = partition.AddAllocation(allocs[0])
+ assert.NilError(t, err, "add alloc-2 should not have failed")
NewWebApp(schedulerContext, nil)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]