This is an automated email from the ASF dual-hosted git repository.
mani pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git
The following commit(s) were added to refs/heads/master by this push:
new 65b114f0 [YUNIKORN-3228] Assess the impact of restart on scheduled
preemption and propose a solution (#1068)
65b114f0 is described below
commit 65b114f0389bf1ff8da126ef701886d3952266ec
Author: mani <[email protected]>
AuthorDate: Wed Apr 8 11:17:11 2026 +0530
[YUNIKORN-3228] Assess the impact of restart on scheduled preemption and
propose a solution (#1068)
Allow existing allocations coming out of recovery process, VPA for existing
running pods and ask transitioned into allocations as usual even if this put up
the usage over the queue max resources. Either already scheduled quota
preemption or configured quota preemption but completed would bring down the
usage once the delay expires.
Closes: #1068
Signed-off-by: mani <[email protected]>
---
pkg/scheduler/objects/application.go | 4 +-
pkg/scheduler/objects/application_test.go | 22 +-
pkg/scheduler/objects/preemption_utilities_test.go | 2 +-
pkg/scheduler/objects/queue.go | 32 +-
pkg/scheduler/objects/queue_test.go | 94 +-
pkg/scheduler/objects/quota_preemptor_test.go | 2 +-
pkg/scheduler/partition.go | 17 +-
pkg/scheduler/partition_test.go | 250 ++++++
pkg/scheduler/tests/recovery_test.go | 952 ++++++---------------
pkg/scheduler/tests/utilities_test.go | 16 +-
pkg/scheduler/utilities_test.go | 67 ++
11 files changed, 726 insertions(+), 732 deletions(-)
diff --git a/pkg/scheduler/objects/application.go
b/pkg/scheduler/objects/application.go
index 1bf020c4..fd2636d8 100644
--- a/pkg/scheduler/objects/application.go
+++ b/pkg/scheduler/objects/application.go
@@ -685,7 +685,7 @@ func (sa *Application) AddAllocationAsk(ask *Allocation)
error {
// UpdateAllocationResources updates the app, queue, and user tracker with
deltas for an allocation.
// If an existing allocation cannot be found or alloc is invalid, an error is
returned.
-func (sa *Application) UpdateAllocationResources(alloc *Allocation) error {
+func (sa *Application) UpdateAllocationResources(alloc *Allocation,
isQuotaPreemptionEnabled bool) error {
sa.Lock()
defer sa.Unlock()
if alloc == nil {
@@ -711,7 +711,7 @@ func (sa *Application) UpdateAllocationResources(alloc
*Allocation) error {
// update allocated resources
sa.allocatedResource = resources.Add(sa.allocatedResource,
delta)
sa.allocatedResource.Prune()
- sa.queue.IncAllocatedResource(delta)
+ sa.queue.IncAllocatedResource(delta, isQuotaPreemptionEnabled)
// update user usage
sa.incUserResourceUsage(delta)
diff --git a/pkg/scheduler/objects/application_test.go
b/pkg/scheduler/objects/application_test.go
index 663ca2ee..c3a3947a 100644
--- a/pkg/scheduler/objects/application_test.go
+++ b/pkg/scheduler/objects/application_test.go
@@ -1054,22 +1054,22 @@ func TestUpdateAllocationResourcePending(t *testing.T) {
assert.Check(t, resources.Equals(res, queue.GetPendingResource()),
"resources not on queue")
// check nil alloc update
- err = app.UpdateAllocationResources(nil)
+ err = app.UpdateAllocationResources(nil, false)
assert.Check(t, err != nil, "error not returned on nil alloc")
// check zero alloc update
zero := newAllocationWithKey(alloc, appID1, nodeID1,
resources.NewResource())
- err = app.UpdateAllocationResources(zero)
+ err = app.UpdateAllocationResources(zero, false)
assert.Check(t, err != nil, "error not returned on zero alloc")
// check missing alloc update
missing := newAllocationWithKey("missing", appID1, nodeID1, res)
- err = app.UpdateAllocationResources(missing)
+ err = app.UpdateAllocationResources(missing, false)
assert.Check(t, err != nil, "error not returned on missing alloc")
// check zero delta
same := newAllocationWithKey(alloc, appID1, nodeID1, res)
- err = app.UpdateAllocationResources(same)
+ err = app.UpdateAllocationResources(same, false)
assert.NilError(t, err, "error returned on same alloc size")
assert.Check(t, resources.Equals(res, app.GetPendingResource()),
"resources not on app")
assert.Check(t, resources.Equals(res, queue.GetPendingResource()),
"resources not on queue")
@@ -1078,7 +1078,7 @@ func TestUpdateAllocationResourcePending(t *testing.T) {
res2, err := resources.NewResourceFromConf(map[string]string{"first":
"3"})
assert.NilError(t, err, "failed to create resource with error")
inc := newAllocationWithKey(alloc, appID1, nodeID1, res2)
- err = app.UpdateAllocationResources(inc)
+ err = app.UpdateAllocationResources(inc, false)
assert.NilError(t, err, "error returned on incremented alloc")
assert.Check(t, resources.Equals(res2, app.GetPendingResource()),
"resources not updated on app")
assert.Check(t, resources.Equals(res2, queue.GetPendingResource()),
"resources not updated on queue")
@@ -1096,30 +1096,30 @@ func TestUpdateAllocationResourceAllocated(t
*testing.T) {
res, err := resources.NewResourceFromConf(map[string]string{"first":
"2"})
assert.NilError(t, err, "failed to create resource with error")
alloc1 := newAllocationWithKey(alloc, appID1, nodeID1, res)
- queue.IncAllocatedResource(res)
+ queue.IncAllocatedResource(res, false)
app.RecoverAllocationAsk(alloc1)
app.AddAllocation(alloc1)
assert.Check(t, resources.Equals(res, queue.GetAllocatedResource()),
"resources not on queue")
// check nil alloc update
- err = app.UpdateAllocationResources(nil)
+ err = app.UpdateAllocationResources(nil, false)
assert.Check(t, err != nil, "error not returned on nil alloc")
// check zero alloc update
zero := newAllocationWithKey(alloc, appID1, nodeID1,
resources.NewResource())
- err = app.UpdateAllocationResources(zero)
+ err = app.UpdateAllocationResources(zero, false)
assert.Check(t, err != nil, "error not returned on zero alloc")
// check missing alloc update
missing := newAllocationWithKey("missing", appID1, nodeID1, res)
- err = app.UpdateAllocationResources(missing)
+ err = app.UpdateAllocationResources(missing, false)
assert.Check(t, err != nil, "error not returned on missing alloc")
assert.Check(t, resources.Equals(res, app.GetAllocatedResource()),
"resources not on app")
assert.Check(t, resources.Equals(res, queue.GetAllocatedResource()),
"resources not on queue")
// check zero delta
same := newAllocationWithKey(alloc, appID1, nodeID1, res)
- err = app.UpdateAllocationResources(same)
+ err = app.UpdateAllocationResources(same, false)
assert.NilError(t, err, "error returned on same alloc size")
assert.Check(t, resources.Equals(res, app.GetAllocatedResource()),
"resources not on app")
assert.Check(t, resources.Equals(res, queue.GetAllocatedResource()),
"resources not on queue")
@@ -1128,7 +1128,7 @@ func TestUpdateAllocationResourceAllocated(t *testing.T) {
res2, err := resources.NewResourceFromConf(map[string]string{"first":
"3"})
assert.NilError(t, err, "failed to create resource with error")
inc := newAllocationWithKey(alloc, appID1, nodeID1, res2)
- err = app.UpdateAllocationResources(inc)
+ err = app.UpdateAllocationResources(inc, false)
assert.NilError(t, err, "error returned on incremented alloc")
assert.Check(t, resources.Equals(res2, app.GetAllocatedResource()),
"resources not updated on app")
assert.Check(t, resources.Equals(res2, queue.GetAllocatedResource()),
"resources not updated on queue")
diff --git a/pkg/scheduler/objects/preemption_utilities_test.go
b/pkg/scheduler/objects/preemption_utilities_test.go
index 9574af49..0e477455 100644
--- a/pkg/scheduler/objects/preemption_utilities_test.go
+++ b/pkg/scheduler/objects/preemption_utilities_test.go
@@ -149,7 +149,7 @@ func assignAllocationsToQueue(allocations []*Allocation,
queue *Queue) {
app = queue.applications[allocation.applicationID]
}
app.AddAllocation(allocation)
- queue.IncAllocatedResource(allocation.GetAllocatedResource())
+ queue.IncAllocatedResource(allocation.GetAllocatedResource(),
false)
}
}
diff --git a/pkg/scheduler/objects/queue.go b/pkg/scheduler/objects/queue.go
index 37792fb2..b20a2622 100644
--- a/pkg/scheduler/objects/queue.go
+++ b/pkg/scheduler/objects/queue.go
@@ -496,6 +496,15 @@ func (sq *Queue) setPreemptionTime(oldMaxResource
*resources.Resource, oldDelay
}
}
+// ResetPreemptionTime reset the quota preemption variables
+// Only for testing
+func (sq *Queue) ResetPreemptionTime() {
+ sq.Lock()
+ defer sq.Unlock()
+ sq.quotaPreemptionStartTime = time.Time{}
+ sq.quotaPreemptionDelay = 0
+}
+
// shouldTriggerPreemption returns true if quota preemption should be
triggered based on the settings and the
// current time.
func (sq *Queue) shouldTriggerPreemption() bool {
@@ -1259,20 +1268,39 @@ func (sq *Queue) TryIncAllocatedResource(alloc
*resources.Resource) error {
}
// IncAllocatedResource increments the allocated resources for this queue
(recursively). No queue limits are checked.
-func (sq *Queue) IncAllocatedResource(alloc *resources.Resource) {
+// In case any quota preemption already scheduled or any quota preemption
delay is configured, set the same delay again
+// so that any usage overflow over the max quota caused by alloc would be
brought down when the delay expires.
+func (sq *Queue) IncAllocatedResource(alloc *resources.Resource,
isQuotaPreemptionEnabled bool) {
// fall through if nil
if sq == nil {
return
}
// update parent
- sq.parent.IncAllocatedResource(alloc)
+ sq.parent.IncAllocatedResource(alloc, isQuotaPreemptionEnabled)
// update this queue
sq.Lock()
defer sq.Unlock()
sq.allocatedResource = resources.Add(sq.allocatedResource, alloc)
sq.updateAllocatedResourceMetrics()
+
+ // Should apply quota preemption based on the config?
+ if !isQuotaPreemptionEnabled ||
+ !sq.quotaPreemptionStartTime.IsZero() ||
+ !sq.isManaged ||
+ sq.quotaPreemptionDelay == 0 ||
+ resources.IsZero(sq.maxResource) ||
+
sq.maxResource.StrictlyGreaterThanOrEqualsOnlyExisting(sq.allocatedResource) {
+ return
+ }
+ // Override the earlier set quota preemption time with the configured
delay.
+ // Delay clock ticking from now.
+ sq.quotaPreemptionStartTime = time.Now().Add(sq.quotaPreemptionDelay)
+ log.Log(log.SchedQueue).Info("Overridden quota preemption time",
+ zap.String("queue", sq.QueuePath),
+ zap.Duration("delay", sq.quotaPreemptionDelay),
+ zap.Time("quotaPreemptionStartTime",
sq.quotaPreemptionStartTime))
}
// allocatedResFits adds the passed in resource to the allocatedResource of
the queue and checks if it still fits in the
diff --git a/pkg/scheduler/objects/queue_test.go
b/pkg/scheduler/objects/queue_test.go
index dd00d46e..b998b9ac 100644
--- a/pkg/scheduler/objects/queue_test.go
+++ b/pkg/scheduler/objects/queue_test.go
@@ -668,8 +668,8 @@ func TestHeadroom(t *testing.T) {
var res *resources.Resource
res, err = resources.NewResourceFromConf(map[string]string{"first":
"5", "second": "3"})
assert.NilError(t, err, "failed to create resource")
- leaf1.IncAllocatedResource(res)
- leaf2.IncAllocatedResource(res)
+ leaf1.IncAllocatedResource(res, false)
+ leaf2.IncAllocatedResource(res, false)
// headRoom root should be this (max 20-10 - alloc 10-6)
res, err = resources.NewResourceFromConf(map[string]string{"first":
"10", "second": "4"})
@@ -726,10 +726,10 @@ func TestHeadroomMerge(t *testing.T) {
var res *resources.Resource
res, err = resources.NewResourceFromConf(map[string]string{"first":
"5", "second": "5", "third": "5"})
assert.NilError(t, err, "failed to create resource")
- leaf1.IncAllocatedResource(res)
+ leaf1.IncAllocatedResource(res, false)
res, err = resources.NewResourceFromConf(map[string]string{"third":
"5", "fourth": "5"})
assert.NilError(t, err, "failed to create resource")
- leaf2.IncAllocatedResource(res)
+ leaf2.IncAllocatedResource(res, false)
// root headroom should be nil
headRoom := root.getHeadRoom()
@@ -796,8 +796,8 @@ func TestMaxHeadroomNoMax(t *testing.T) {
var res *resources.Resource
res, err = resources.NewResourceFromConf(map[string]string{"first":
"5", "second": "3"})
assert.NilError(t, err, "failed to create resource")
- leaf1.IncAllocatedResource(res)
- leaf2.IncAllocatedResource(res)
+ leaf1.IncAllocatedResource(res, false)
+ leaf2.IncAllocatedResource(res, false)
headRoom = root.getMaxHeadRoom()
assert.Assert(t, headRoom == nil, "headRoom of root should be nil
because no max set for all queues")
@@ -834,8 +834,8 @@ func TestMaxHeadroomMax(t *testing.T) {
var res *resources.Resource
res, err = resources.NewResourceFromConf(map[string]string{"first":
"5", "second": "3"})
assert.NilError(t, err, "failed to create resource")
- leaf1.IncAllocatedResource(res)
- leaf2.IncAllocatedResource(res)
+ leaf1.IncAllocatedResource(res, false)
+ leaf2.IncAllocatedResource(res, false)
// root headroom should be nil
headRoom := root.getMaxHeadRoom()
@@ -1475,7 +1475,7 @@ func TestOutStandingRequestMultipleChildrenWithMax(t
*testing.T) {
leaf1App := newApplication("app-leaf1", "default", "root.parent.leaf1")
leaf1App.SetQueue(leaf1)
leaf1.AddApplication(leaf1App)
- leaf1.IncAllocatedResource(allocatedRes)
+ leaf1.IncAllocatedResource(allocatedRes, false)
// use priority = 1000 for this ask to force ordering of queues when
sorting
askLeaf1 := newAllocationAskAll("ask-leaf1", "app-leaf1", "", askRes,
false, 1000)
askLeaf1.SetSchedulingAttempted(true)
@@ -1493,7 +1493,7 @@ func TestOutStandingRequestMultipleChildrenWithMax(t
*testing.T) {
err = leaf2App.AddAllocationAsk(ask2Leaf2)
assert.NilError(t, err, "could not add ask")
leaf2.AddApplication(leaf2App)
- leaf2.IncAllocatedResource(allocatedRes)
+ leaf2.IncAllocatedResource(allocatedRes, false)
outstanding := root.GetOutstandingRequests()
assert.Equal(t, 2, len(outstanding), "expected 2 outstanding requests
to be collected")
@@ -2374,11 +2374,12 @@ func TestQuotaPreemptionSettings(t *testing.T) {
assert.NilError(t, err, "failed to create basic queue: %v", err)
testCases := []struct {
- name string
- maxRes map[string]string
- conf configs.QueueConfig
- oldDelay time.Duration
- timeChange bool
+ name string
+ maxRes map[string]string
+ conf configs.QueueConfig
+ oldDelay time.Duration
+ timeChange bool
+ shouldApplyQuotaPreemption bool
}{
{"clearing max",
map[string]string{"memory": "500"},
@@ -2386,7 +2387,7 @@ func TestQuotaPreemptionSettings(t *testing.T) {
Resources: configs.Resources{
Max: nil,
},
- }, 0, false},
+ }, 0, false, false},
{"clearing max with delay",
map[string]string{"memory": "500"},
configs.QueueConfig{
@@ -2394,7 +2395,7 @@ func TestQuotaPreemptionSettings(t *testing.T) {
Max: nil,
},
Properties:
map[string]string{configs.QuotaPreemptionDelay: "50ms"},
- }, 0, false},
+ }, 0, false, false},
{"incorrect delay",
map[string]string{"memory": "500"},
configs.QueueConfig{
@@ -2402,7 +2403,7 @@ func TestQuotaPreemptionSettings(t *testing.T) {
Max: nil,
},
Properties:
map[string]string{configs.QuotaPreemptionDelay: "-50s"},
- }, 0, false},
+ }, 0, false, false},
{"increase max with delay",
map[string]string{"memory": "500"},
configs.QueueConfig{
@@ -2410,7 +2411,7 @@ func TestQuotaPreemptionSettings(t *testing.T) {
Max: map[string]string{"memory":
"1000"},
},
Properties:
map[string]string{configs.QuotaPreemptionDelay: "50ms"},
- }, 50 * time.Millisecond, false},
+ }, 50 * time.Millisecond, false, false},
{"decrease max with delay",
map[string]string{"memory": "500"},
configs.QueueConfig{
@@ -2418,7 +2419,7 @@ func TestQuotaPreemptionSettings(t *testing.T) {
Max: map[string]string{"memory": "100"},
},
Properties:
map[string]string{configs.QuotaPreemptionDelay: "50ms"},
- }, 50 * time.Millisecond, true},
+ }, 50 * time.Millisecond, true, true},
{"delay changed from 0 no max change",
map[string]string{"memory": "500"},
configs.QueueConfig{
@@ -2426,7 +2427,7 @@ func TestQuotaPreemptionSettings(t *testing.T) {
Max: map[string]string{"memory": "500"},
},
Properties:
map[string]string{configs.QuotaPreemptionDelay: "50ms"},
- }, 0, true},
+ }, 0, true, true},
}
var oldMax *resources.Resource
@@ -3266,3 +3267,54 @@ func TestQueue_setPreemptionTime(t *testing.T) {
})
}
}
+
+func TestOverrideAndResetPreemptionTime(t *testing.T) {
+ root, e := createRootQueue(nil)
+ assert.NilError(t, e, "failed to create basic root queue")
+ tests := []struct {
+ name string
+ oldMaxResource *resources.Resource
+ maxRes map[string]string
+ currentUsage *resources.Resource
+ delay time.Duration
+ timeChange bool
+ overrideTimeChange bool
+ }{
+ {"usage lesser than max res, so preemption time is not set. try
to override",
resources.NewResourceFromMap(map[string]resources.Quantity{"test": 100}),
map[string]string{"test": "150"},
resources.NewResourceFromMap(map[string]resources.Quantity{"test": 80}), 5,
false, false},
+ {"setting preemption time first, try to override",
resources.NewResourceFromMap(map[string]resources.Quantity{"test": 100}),
map[string]string{"test": "50"},
resources.NewResourceFromMap(map[string]resources.Quantity{"test": 80}), 10,
true, false},
+ {"preemption time did not set first time, try to override",
resources.NewResourceFromMap(map[string]resources.Quantity{"test": 100}),
map[string]string{"test": "150"},
resources.NewResourceFromMap(map[string]resources.Quantity{"test": 180}), 10,
false, true},
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ queue, err := createManagedQueue(root, "test", false,
tt.maxRes)
+ assert.NilError(t, err, "queue creation failed
unexpectedly")
+ queue.quotaPreemptionDelay = tt.delay
+ queue.allocatedResource = tt.currentUsage
+ before := queue.quotaPreemptionStartTime
+ queue.setPreemptionTime(tt.oldMaxResource, tt.delay)
+ after := queue.quotaPreemptionStartTime
+ if tt.timeChange {
+ assert.Assert(t, !before.Equal(after), "time
change is expected")
+ assert.Assert(t,
!queue.quotaPreemptionStartTime.IsZero(), "schedule is expected")
+ }
+
queue.IncAllocatedResource(resources.NewResourceFromMap(map[string]resources.Quantity{"test":
1}), true)
+ afterOverride := queue.quotaPreemptionStartTime
+ if tt.overrideTimeChange {
+ assert.Assert(t, !after.Equal(afterOverride),
"time change is not expected")
+ assert.Assert(t,
!queue.quotaPreemptionStartTime.IsZero(), "schedule is expected")
+ }
+ if tt.timeChange && !tt.overrideTimeChange {
+ assert.Assert(t, after.Equal(afterOverride),
"time change is not expected")
+ assert.Assert(t,
!queue.quotaPreemptionStartTime.IsZero(), "schedule is expected")
+ }
+ if tt.timeChange || tt.overrideTimeChange {
+ queue.ResetPreemptionTime()
+ afterReset := queue.quotaPreemptionStartTime
+ assert.Assert(t,
!afterOverride.Equal(afterReset), "time change is expected")
+ assert.Assert(t, afterReset.IsZero(), "time
change is expected")
+ assert.Assert(t, queue.quotaPreemptionDelay ==
0, "time change is expected")
+ assert.Assert(t,
queue.quotaPreemptionStartTime.IsZero(), "schedule is not expected")
+ }
+ })
+ }
+}
diff --git a/pkg/scheduler/objects/quota_preemptor_test.go
b/pkg/scheduler/objects/quota_preemptor_test.go
index d3dff3ed..8f7584e9 100644
--- a/pkg/scheduler/objects/quota_preemptor_test.go
+++ b/pkg/scheduler/objects/quota_preemptor_test.go
@@ -65,7 +65,7 @@ func TestQuotaChangeGetPreemptableResource(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
tc.queue.parent.guaranteedResource = tc.parentGuaranteed
tc.queue.maxResource = tc.maxResource
- tc.queue.IncAllocatedResource(tc.usedResource)
+ tc.queue.IncAllocatedResource(tc.usedResource, false)
preemptor := NewQuotaPreemptor(tc.queue)
preemptor.setPreemptableResources()
assert.Equal(t,
resources.Equals(preemptor.preemptableResource, tc.preemptable), true)
diff --git a/pkg/scheduler/partition.go b/pkg/scheduler/partition.go
index 82c398e0..3be1bc7c 100644
--- a/pkg/scheduler/partition.go
+++ b/pkg/scheduler/partition.go
@@ -1236,7 +1236,10 @@ func (pc *PartitionContext) UpdateAllocation(alloc
*objects.Allocation) (request
zap.String("appID", applicationID),
zap.String("allocationKey", allocationKey))
- queue.IncAllocatedResource(res)
+ // Increase the queue resource usage at any cost even if
allocation put up the usage over the max resources.
+ // Quota preemption delay (only if set) used before restart
cannot be adjusted based on the lost time after restart.
+ // So, override the quota preemption time with the configured
delay again so that preemption would be triggerred once the delay expires.
+ queue.IncAllocatedResource(res, pc.IsQuotaPreemptionEnabled())
metrics.GetQueueMetrics(queue.GetQueuePath()).IncAllocatedContainer()
node.AddAllocation(alloc)
alloc.SetInstanceType(node.GetInstanceType())
@@ -1271,7 +1274,11 @@ func (pc *PartitionContext) UpdateAllocation(alloc
*objects.Allocation) (request
delta.Prune()
if !resources.IsZero(delta) && !resources.IsZero(newResource) {
// resources have changed, update them on application, which
also handles queue and user tracker updates
- if err := app.UpdateAllocationResources(alloc); err != nil {
+ // Increase the queue resource usage at any cost even if
changed (+ve) resources put up the usage over the max resources.
+ // In case quota preemption set but not completed, usage would
be brought down as part of enforcement through preemption when the already set
delay expires.
+ // In case quota preemption set and completed already,
configured delay would be set again and
+ // usage would be brought down when the newly set delay expires.
+ if err := app.UpdateAllocationResources(alloc,
pc.IsQuotaPreemptionEnabled()); err != nil {
metrics.GetSchedulerMetrics().IncSchedulingError()
return false, false, fmt.Errorf("cannot update alloc
resources on application %s: %v ",
alloc.GetApplicationID(), err)
@@ -1301,7 +1308,11 @@ func (pc *PartitionContext) UpdateAllocation(alloc
*objects.Allocation) (request
return false, false, err
}
- queue.IncAllocatedResource(alloc.GetAllocatedResource())
+ // Increase the queue resource usage at any cost even if ask
accommodated earlier based on the old max resources causes usage overflow based
on the current max resources now.
+ // In case quota preemption set but not completed, usage would
be brought down as part of enforcement through preemption when the already set
delay expires.
+ // In case quota preemption set and completed already,
configured delay would be set again and
+ // usage would be brought down when the newly set delay expires.
+ queue.IncAllocatedResource(alloc.GetAllocatedResource(),
pc.IsQuotaPreemptionEnabled())
metrics.GetQueueMetrics(queue.GetQueuePath()).IncAllocatedContainer()
node.AddAllocation(existing)
existing.SetInstanceType(node.GetInstanceType())
diff --git a/pkg/scheduler/partition_test.go b/pkg/scheduler/partition_test.go
index e62e501a..33bb49d4 100644
--- a/pkg/scheduler/partition_test.go
+++ b/pkg/scheduler/partition_test.go
@@ -21,6 +21,7 @@ package scheduler
import (
"fmt"
"strconv"
+ "strings"
"testing"
"time"
@@ -34,6 +35,7 @@ import (
"github.com/apache/yunikorn-core/pkg/metrics"
"github.com/apache/yunikorn-core/pkg/mock"
"github.com/apache/yunikorn-core/pkg/plugins"
+ "github.com/apache/yunikorn-core/pkg/rmproxy"
"github.com/apache/yunikorn-core/pkg/rmproxy/rmevent"
"github.com/apache/yunikorn-core/pkg/scheduler/objects"
"github.com/apache/yunikorn-core/pkg/scheduler/policies"
@@ -3646,6 +3648,130 @@ func TestUpdateAllocation(t *testing.T) {
assert.Check(t, !allocCreated, "alloc should not have been created")
}
+func TestUpdateAllocationWithQuotaPreemption(t *testing.T) {
+ setupUGM()
+ partition := createQuotaPreemptionQueuesNodes(t)
+ leafQueueConf :=
[]configs.QueueConfig{createLeafQueueConfig(map[string]string{"memory": "5",
"vcore": "5"}, map[string]string{configs.QuotaPreemptionDelay: "1s"})}
+ leafQueueConf1 :=
[]configs.QueueConfig{createLeafQueueConfig(map[string]string{"memory": "5",
"vcore": "5"}, nil)}
+ leafQueueConf2 :=
[]configs.QueueConfig{createLeafQueueConfig(map[string]string{"memory": "5",
"vcore": "5"}, map[string]string{configs.QuotaPreemptionDelay: "0s"})}
+ leafQueueConf3 := []configs.QueueConfig{createLeafQueueConfig(nil,
map[string]string{configs.QuotaPreemptionDelay: "10s"})}
+ leafQueueConf4 :=
[]configs.QueueConfig{createLeafQueueConfig(map[string]string{"memory": "12",
"vcore": "12"}, map[string]string{configs.QuotaPreemptionDelay: "1s"})}
+ allocRes := &objects.AllocationResult{ResultType: objects.Allocated}
+ tests := []struct {
+ name string
+ partitionContext *PartitionContext
+ leafQueueConfig []configs.QueueConfig
+ allocResult *objects.AllocationResult
+ releasedEvents int
+ }{
+ {"preemption enabled at partition level", partition,
leafQueueConf, nil, 1},
+ {"preemption enabled at partition level but not at queue level,
delay not defined explicitly", partition, leafQueueConf1, nil, 0},
+ {"preemption enabled at partition level but not at queue level,
delay defined as 0s", partition, leafQueueConf2, nil, 0},
+ {"preemption enabled at partition level but max resources not
defined at queue level", partition, leafQueueConf3, allocRes, 0},
+ {"preemption enabled at partition level but max resources
higher than usage at queue level", partition, leafQueueConf4, allocRes, 0},
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ partition = tt.partitionContext
+
+ var testHandler *rmproxy.MockedRMProxy
+ // add the app to add an ask to
+ app, testHandler := newApplicationWithHandler(appID1,
"default", "root.leaf")
+ err := partition.AddApplication(app)
+ assert.NilError(t, err, "app-1 should have been added
to the partition")
+
+ // Add simple allocations
+ var i int
+ maxAllocs := 5
+ for i = 1; i <= maxAllocs; i++ {
+ var res *resources.Resource
+ res, err =
resources.NewResourceFromConf(map[string]string{"vcore": "2"})
+ assert.NilError(t, err, "failed to create
resource")
+ alloc := si.Allocation{
+ AllocationKey: "ask-key-" +
strconv.Itoa(i),
+ ApplicationID: appID1,
+ NodeID: nodeID1,
+ ResourcePerAlloc: res.ToProto(),
+ }
+ _, allocCreated, allocCreatedErr :=
partition.UpdateAllocation(objects.NewAllocationFromSI(&alloc))
+ assert.NilError(t, allocCreatedErr, "failed to
add alloc to app")
+ assert.Check(t, allocCreated, "alloc should
have been created")
+ }
+ totalRes, err :=
resources.NewResourceFromConf(map[string]string{"vcore": "10"})
+ assert.NilError(t, err, "failed to create resource")
+ if !resources.Equals(app.GetAllocatedResource(),
totalRes) {
+ t.Fatal("app not updated by adding alloc, no
error thrown")
+ }
+
+ // There is a queue setup as the config must be valid
when we run
+ root := partition.GetQueue("root")
+ if root == nil {
+ t.Error("root queue not found in partition")
+ }
+
+ err = partition.updateQueues(tt.leafQueueConfig, root)
+ assert.NilError(t, err, "config should have been
updated")
+
+ leaf := partition.GetQueue("root.leaf")
+ if leaf == nil {
+ t.Error("root queue not found in partition")
+ }
+ assert.Equal(t,
len(leaf.GetApplication(appID1).GetAllAllocations()), maxAllocs)
+
+ // update to existing alloc with changed resources
+ res, err :=
resources.NewResourceFromConf(map[string]string{"vcore": "3"})
+ assert.NilError(t, err, "failed to create resource")
+ alloc := si.Allocation{
+ AllocationKey: "ask-key-1",
+ ApplicationID: appID1,
+ NodeID: nodeID1,
+ ResourcePerAlloc: res.ToProto(),
+ }
+ _, allocCreated, err :=
partition.UpdateAllocation(objects.NewAllocationFromSI(&alloc))
+ assert.NilError(t, err, "failed to update alloc on app")
+ assert.Check(t, !allocCreated, "alloc should not have
been created")
+
+ totalRes, err =
resources.NewResourceFromConf(map[string]string{"vcore": "11"})
+ assert.NilError(t, err, "failed to create resource")
+ if !resources.Equals(app.GetAllocatedResource(),
totalRes) {
+ t.Fatal("app not updated with new resources")
+ }
+ assert.Equal(t,
len(leaf.GetApplication(appID1).GetAllAllocations()), 5)
+
+ res, err =
resources.NewResourceFromConf(map[string]string{"vcore": "1"})
+ assert.NilError(t, err, "failed to create resource")
+ err =
app.AddAllocationAsk(newAllocationAsk("ask-key-6", appID1, res))
+ assert.NilError(t, err, "failed to add ask ask-key-2 to
app-1")
+
+ // delay so that preemption delay of 1 sec expires
+ time.Sleep(1100 * time.Millisecond)
+
+ result := partition.tryAllocate()
+
+ // delay so that events are sent out
+ time.Sleep(100 * time.Millisecond)
+
+ if tt.allocResult == nil {
+ events := testHandler.GetEvents()
+ eventsCount := 0
+ for _, event := range events {
+ if allocRelease, ok :=
event.(*rmevent.RMReleaseAllocationEvent); ok {
+ assert.Equal(t,
len(allocRelease.ReleasedAllocations), 3)
+ assert.Equal(t,
allocRelease.ReleasedAllocations[0].GetTerminationType(),
si.TerminationType_PREEMPTED_BY_SCHEDULER, "")
+ assert.Assert(t,
strings.Contains(allocRelease.ReleasedAllocations[0].AllocationKey, "ask-key"),
"wrong allocation released on quota preemption")
+ eventsCount++
+ }
+ }
+ assert.Equal(t, eventsCount, tt.releasedEvents,
"unexpected release events count")
+ } else {
+ assert.Equal(t, result.ResultType,
objects.Allocated)
+ }
+ leaf.ResetPreemptionTime()
+ partition.removeApplication(appID1)
+ })
+ }
+}
+
func TestUpdateAllocationWithAsk(t *testing.T) {
setupUGM()
partition := createQueuesNodes(t)
@@ -3716,6 +3842,130 @@ func TestUpdateAllocationWithAsk(t *testing.T) {
assert.Check(t, !askCreated, "ask should not have been created")
}
+func TestUpdateAllocationWithAskAndQuotaPreemption(t *testing.T) {
+ setupUGM()
+ partition := createQuotaPreemptionQueuesNodes(t)
+ leafQueueConf :=
[]configs.QueueConfig{createLeafQueueConfig(map[string]string{"memory": "5",
"vcore": "5"}, map[string]string{configs.QuotaPreemptionDelay: "1s"})}
+ leafQueueConf1 :=
[]configs.QueueConfig{createLeafQueueConfig(map[string]string{"memory": "5",
"vcore": "5"}, nil)}
+ leafQueueConf2 :=
[]configs.QueueConfig{createLeafQueueConfig(map[string]string{"memory": "5",
"vcore": "5"}, map[string]string{configs.QuotaPreemptionDelay: "0s"})}
+ leafQueueConf3 := []configs.QueueConfig{createLeafQueueConfig(nil,
map[string]string{configs.QuotaPreemptionDelay: "10s"})}
+ leafQueueConf4 :=
[]configs.QueueConfig{createLeafQueueConfig(map[string]string{"memory": "12",
"vcore": "12"}, map[string]string{configs.QuotaPreemptionDelay: "1s"})}
+ allocRes := &objects.AllocationResult{ResultType: objects.Allocated}
+ tests := []struct {
+ name string
+ partitionContext *PartitionContext
+ leafQueueConfig []configs.QueueConfig
+ allocResult *objects.AllocationResult
+ releasedEvents int
+ }{
+ {"preemption enabled at partition level", partition,
leafQueueConf, nil, 1},
+ {"preemption enabled at partition level but not at queue level,
delay not defined explicitly", partition, leafQueueConf1, nil, 0},
+ {"preemption enabled at partition level but not at queue level,
delay defined as 0s", partition, leafQueueConf2, nil, 0},
+ {"preemption enabled at partition level but max resources not
defined at queue level", partition, leafQueueConf3, allocRes, 0},
+ {"preemption enabled at partition level but max resources
higher than usage at queue level", partition, leafQueueConf4, allocRes, 0},
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ partition = tt.partitionContext
+ var testHandler *rmproxy.MockedRMProxy
+
+ // add the app to add an ask to
+ app, testHandler := newApplicationWithHandler(appID1,
"default", "root.leaf")
+ err := partition.AddApplication(app)
+ assert.NilError(t, err, "app-1 should have been added
to the partition")
+
+ // Add simple allocations
+ var i int
+ maxAllocs := 4
+ for i = 1; i <= maxAllocs; i++ {
+ var res *resources.Resource
+ res, err =
resources.NewResourceFromConf(map[string]string{"vcore": "2"})
+ assert.NilError(t, err, "failed to create
resource")
+ alloc := si.Allocation{
+ AllocationKey: "ask-key-" +
strconv.Itoa(i),
+ ApplicationID: appID1,
+ NodeID: nodeID1,
+ ResourcePerAlloc: res.ToProto(),
+ }
+ _, allocCreated, allocCreatedErr :=
partition.UpdateAllocation(objects.NewAllocationFromSI(&alloc))
+ assert.NilError(t, allocCreatedErr, "failed to
add alloc to app")
+ assert.Check(t, allocCreated, "alloc should
have been created")
+ }
+ totalRes, err :=
resources.NewResourceFromConf(map[string]string{"vcore": "8"})
+ assert.NilError(t, err, "failed to create resource")
+ if !resources.Equals(app.GetAllocatedResource(),
totalRes) {
+ t.Fatal("app not updated by adding alloc, no
error thrown")
+ }
+
+ // a simple ask
+ var res *resources.Resource
+ res, err =
resources.NewResourceFromConf(map[string]string{"vcore": "2"})
+ assert.NilError(t, err, "failed to create resource")
+ ask := si.Allocation{
+ AllocationKey: "ask-key-5",
+ ApplicationID: appID1,
+ ResourcePerAlloc: res.ToProto(),
+ }
+ askCreated, _, err :=
partition.UpdateAllocation(objects.NewAllocationFromSI(&ask))
+ assert.NilError(t, err, "failed to add ask to app")
+ assert.Check(t, askCreated, "ask should have been
created")
+ if !resources.Equals(app.GetPendingResource(), res) {
+ t.Fatal("app not updated by adding ask, no
error thrown")
+ }
+ root := partition.GetQueue("root")
+ if root == nil {
+ t.Error("root queue not found in partition")
+ }
+ err = partition.updateQueues(tt.leafQueueConfig, root)
+ assert.NilError(t, err, "config should have been
updated")
+
+ leaf := partition.GetQueue("root.leaf")
+ if leaf == nil {
+ t.Error("root queue not found in partition")
+ }
+
+ // transition to allocated
+ alloc := si.Allocation{
+ AllocationKey: "ask-key-6",
+ ApplicationID: appID1,
+ NodeID: nodeID1,
+ ResourcePerAlloc: res.ToProto(),
+ }
+ askCreated, allocCreated, err :=
partition.UpdateAllocation(objects.NewAllocationFromSI(&alloc))
+ assert.NilError(t, err, "failed to transition ask to
allocated")
+ assert.Check(t, !askCreated, "ask should not have been
created")
+ assert.Check(t, allocCreated, "alloc should have been
created")
+ assert.Equal(t,
len(leaf.GetApplication(appID1).GetAllAllocations()), 5)
+
+ // delay so that preemption delay of 1 sec expires
+ time.Sleep(1100 * time.Millisecond)
+
+ result := partition.tryAllocate()
+
+ // delay so that events are sent out
+ time.Sleep(100 * time.Millisecond)
+
+ if tt.allocResult == nil {
+ events := testHandler.GetEvents()
+ eventsCount := 0
+ for _, event := range events {
+ if allocRelease, ok :=
event.(*rmevent.RMReleaseAllocationEvent); ok {
+ assert.Equal(t,
len(allocRelease.ReleasedAllocations), 2)
+ assert.Equal(t,
allocRelease.ReleasedAllocations[0].GetTerminationType(),
si.TerminationType_PREEMPTED_BY_SCHEDULER, "")
+ assert.Assert(t,
strings.Contains(allocRelease.ReleasedAllocations[0].AllocationKey, "ask-key"),
"wrong allocation released on quota preemption")
+ eventsCount++
+ }
+ }
+ assert.Equal(t, eventsCount, tt.releasedEvents,
"unexpected release events count")
+ } else {
+ assert.Equal(t, result.ResultType,
objects.Allocated)
+ }
+ leaf.ResetPreemptionTime()
+ partition.removeApplication(appID1)
+ })
+ }
+}
+
func TestRemoveAllocationAsk(t *testing.T) {
setupUGM()
partition, err := newBasePartition()
diff --git a/pkg/scheduler/tests/recovery_test.go
b/pkg/scheduler/tests/recovery_test.go
index 46248043..a2874b6b 100644
--- a/pkg/scheduler/tests/recovery_test.go
+++ b/pkg/scheduler/tests/recovery_test.go
@@ -20,12 +20,13 @@ package tests
import (
"fmt"
+ "strings"
"testing"
"gotest.tools/v3/assert"
+ "github.com/apache/yunikorn-core/pkg/common/configs"
"github.com/apache/yunikorn-core/pkg/common/resources"
- "github.com/apache/yunikorn-core/pkg/entrypoint"
"github.com/apache/yunikorn-core/pkg/scheduler"
"github.com/apache/yunikorn-core/pkg/scheduler/objects"
"github.com/apache/yunikorn-scheduler-interface/lib/go/common"
@@ -49,7 +50,6 @@ partitions:
vcore: 20
`
-//nolint:funlen
func TestSchedulerRecovery(t *testing.T) {
// --------------------------------------------------
// Phase 1) Fresh start
@@ -57,96 +57,16 @@ func TestSchedulerRecovery(t *testing.T) {
ms := &mockScheduler{}
defer ms.Stop()
- err := ms.Init(configData, false, false)
- assert.NilError(t, err, "RegisterResourceManager failed")
-
- // Check queues of scheduler.GetClusterContext() and scheduler.
- part := ms.scheduler.GetClusterContext().GetPartition("[rm:123]default")
- assert.Assert(t, nil == part.GetTotalPartitionResource())
-
- // Check scheduling queue root
- rootQ := part.GetQueue("root")
- if rootQ == nil {
- t.Fatal("root queue not found on partition")
- }
- assert.Assert(t, nil == rootQ.GetMaxResource())
-
- // Check scheduling queue a
- queue := part.GetQueue("root.a")
- assert.Equal(t, resources.Quantity(150),
queue.GetMaxResource().Resources[common.Memory])
-
- // Register nodes, and add apps
- err = ms.proxy.UpdateNode(&si.NodeRequest{
- Nodes: []*si.NodeInfo{
- {
- NodeID: "node-1:1234",
- Attributes: map[string]string{},
- SchedulableResource: &si.Resource{
- Resources: map[string]*si.Quantity{
- "memory": {Value: 100},
- "vcore": {Value: 20},
- },
- },
- Action: si.NodeInfo_CREATE,
- },
- {
- NodeID: "node-2:1234",
- Attributes: map[string]string{},
- SchedulableResource: &si.Resource{
- Resources: map[string]*si.Quantity{
- "memory": {Value: 100},
- "vcore": {Value: 20},
- },
- },
- Action: si.NodeInfo_CREATE,
- },
- },
- RmID: "rm:123",
- })
-
- assert.NilError(t, err, "NodeRequest failed")
- ms.mockRM.waitForAcceptedNode(t, "node-1:1234", 1000)
- ms.mockRM.waitForAcceptedNode(t, "node-2:1234", 1000)
-
- // Add two apps and wait for them to be accepted
- err = ms.proxy.UpdateApplication(&si.ApplicationRequest{
- New: newAddAppRequest(map[string]string{appID1: "root.a"}),
- RmID: "rm:123",
- })
- assert.NilError(t, err, "ApplicationRequest failed")
- ms.mockRM.waitForAcceptedApplication(t, appID1, 1000)
+ _, rootQ, queueA := doRecoverySetup(t, configData, ms, true, false,
[]string{"node-1:1234", "node-2:1234"}, true, []string{appID1}, nil)
+ assert.Equal(t, resources.Quantity(150),
queueA.GetMaxResource().Resources[common.Memory])
// Get scheduling app
app := ms.getApplication(appID1)
- // Verify app initial state
- var app01 *objects.Application
- app01, err = getApplication(part, appID1)
- assert.NilError(t, err, "app not found on partition")
- assert.Equal(t, app01.CurrentState(), objects.New.String())
-
- err = ms.proxy.UpdateAllocation(&si.AllocationRequest{
+ err := ms.proxy.UpdateAllocation(&si.AllocationRequest{
Allocations: []*si.Allocation{
- {
- AllocationKey: "alloc-0",
- ResourcePerAlloc: &si.Resource{
- Resources: map[string]*si.Quantity{
- "memory": {Value: 10},
- "vcore": {Value: 1},
- },
- },
- ApplicationID: appID1,
- },
- {
- AllocationKey: "alloc-1",
- ResourcePerAlloc: &si.Resource{
- Resources: map[string]*si.Quantity{
- "memory": {Value: 10},
- "vcore": {Value: 1},
- },
- },
- ApplicationID: appID1,
- },
+ createAllocation("alloc-0", "", appID1, 10, 1, "",
false),
+ createAllocation("alloc-1", "", appID1, 10, 1, "",
false),
},
RmID: "rm:123",
})
@@ -155,27 +75,26 @@ func TestSchedulerRecovery(t *testing.T) {
// Wait pending resource of queue a and scheduler queue
// Both pending memory = 10 * 2 = 20;
- waitForPendingQueueResource(t, queue, 20, 1000)
- waitForPendingQueueResource(t, rootQ, 20, 1000)
+ waitForQueuesPendingResource(t, []*objects.Queue{queueA, rootQ}, 0,
1000)
waitForPendingAppResource(t, app, 20, 1000)
- assert.Equal(t, app01.CurrentState(), objects.Accepted.String())
+
+ assert.Equal(t, app.CurrentState(), objects.Accepted.String())
ms.scheduler.MultiStepSchedule(5)
ms.mockRM.waitForAllocations(t, 2, 1000)
// Make sure pending resource updated to 0
- waitForPendingQueueResource(t, queue, 0, 1000)
- waitForPendingQueueResource(t, rootQ, 0, 1000)
+ waitForQueuesPendingResource(t, []*objects.Queue{queueA, rootQ}, 0,
1000)
waitForPendingAppResource(t, app, 0, 1000)
// Check allocated resources of queues, apps
- assert.Equal(t, queue.GetAllocatedResource().Resources[common.Memory],
resources.Quantity(20))
+ assert.Equal(t, queueA.GetAllocatedResource().Resources[common.Memory],
resources.Quantity(20))
assert.Equal(t, rootQ.GetAllocatedResource().Resources[common.Memory],
resources.Quantity(20))
assert.Equal(t, app.GetAllocatedResource().Resources[common.Memory],
resources.Quantity(20))
// once we start to process allocation asks from this app, verify the
state again
- assert.Equal(t, app01.CurrentState(), objects.Running.String())
+ assert.Equal(t, app.CurrentState(), objects.Running.String())
// Check allocated resources of nodes
waitForAllocatedNodeResource(t, ms.scheduler.GetClusterContext(),
"[rm:123]default",
@@ -185,16 +104,7 @@ func TestSchedulerRecovery(t *testing.T) {
asks := make([]*si.Allocation, 4)
mem := [4]int64{50, 100, 50, 100}
for i := 0; i < 4; i++ {
- asks[i] = &si.Allocation{
- AllocationKey: fmt.Sprintf("alloc-%d", i+2),
- ResourcePerAlloc: &si.Resource{
- Resources: map[string]*si.Quantity{
- "memory": {Value: mem[i]},
- "vcore": {Value: 5},
- },
- },
- ApplicationID: appID1,
- }
+ asks[i] = createAllocation(fmt.Sprintf("alloc-%d", i+2), "",
appID1, int(mem[i]), 5, "", false)
}
err = ms.proxy.UpdateAllocation(&si.AllocationRequest{
Allocations: asks,
@@ -205,8 +115,7 @@ func TestSchedulerRecovery(t *testing.T) {
// Wait pending resource of queue a and scheduler queue
// Both pending memory = 50 * 2 + 100 * 2 = 300;
- waitForPendingQueueResource(t, queue, 300, 1000)
- waitForPendingQueueResource(t, rootQ, 300, 1000)
+ waitForQueuesPendingResource(t, []*objects.Queue{queueA, rootQ}, 300,
1000)
waitForPendingAppResource(t, app, 300, 1000)
// Now app-1 uses 20 resource, and queue-a's max = 150, so it can get
two 50 container allocated.
@@ -215,12 +124,11 @@ func TestSchedulerRecovery(t *testing.T) {
ms.mockRM.waitForAllocations(t, 4, 3000)
// Check pending resource, should be 200 now.
- waitForPendingQueueResource(t, queue, 200, 1000)
- waitForPendingQueueResource(t, rootQ, 200, 1000)
+ waitForQueuesPendingResource(t, []*objects.Queue{queueA, rootQ}, 200,
1000)
waitForPendingAppResource(t, app, 200, 1000)
// Check allocated resources of queues, apps
- assert.Equal(t, queue.GetAllocatedResource().Resources[common.Memory],
resources.Quantity(120))
+ assert.Equal(t, queueA.GetAllocatedResource().Resources[common.Memory],
resources.Quantity(120))
assert.Equal(t, rootQ.GetAllocatedResource().Resources[common.Memory],
resources.Quantity(120))
assert.Equal(t, app.GetAllocatedResource().Resources[common.Memory],
resources.Quantity(120))
@@ -234,51 +142,8 @@ func TestSchedulerRecovery(t *testing.T) {
// keep the existing mockRM
mockRM := ms.mockRM
ms.serviceContext.StopAll()
- // restart
- err = ms.Init(configData, false, false)
- assert.NilError(t, err, "2nd RegisterResourceManager failed")
-
- // Register nodes, and add apps
- err = ms.proxy.UpdateApplication(&si.ApplicationRequest{
- New: newAddAppRequest(map[string]string{appID1: "root.a"}),
- RmID: "rm:123",
- })
- assert.NilError(t, err, "ApplicationRequest failed")
- ms.mockRM.waitForAcceptedApplication(t, appID1, 1000)
-
- err = ms.proxy.UpdateNode(&si.NodeRequest{
- Nodes: []*si.NodeInfo{
- {
- NodeID: "node-1:1234",
- Attributes: map[string]string{},
- SchedulableResource: &si.Resource{
- Resources: map[string]*si.Quantity{
- "memory": {Value: 100},
- "vcore": {Value: 20},
- },
- },
- Action: si.NodeInfo_CREATE,
- },
- {
- NodeID: "node-2:1234",
- Attributes: map[string]string{},
- SchedulableResource: &si.Resource{
- Resources: map[string]*si.Quantity{
- "memory": {Value: 100},
- "vcore": {Value: 20},
- },
- },
- Action: si.NodeInfo_CREATE,
- },
- },
- RmID: "rm:123",
- })
- assert.NilError(t, err, "NodeRequest nodes and app for recovery failed")
- ms.mockRM.waitForAcceptedNode(t, "node-1:1234", 1000)
- ms.mockRM.waitForAcceptedNode(t, "node-2:1234", 1000)
- // verify partition info
- part = ms.scheduler.GetClusterContext().GetPartition(ms.partitionName)
+ part, rootQ, queueA := doRecoverySetup(t, configData, ms, true, false,
[]string{"node-1:1234", "node-2:1234"}, true, []string{appID1}, nil)
// add allocs to partition
node1Allocations := mockRM.nodeAllocations["node-1:1234"]
@@ -301,24 +166,19 @@ func TestSchedulerRecovery(t *testing.T) {
assert.Equal(t, len(node1Allocations),
len(part.GetNode("node-1:1234").GetYunikornAllocations()), "allocations on
node-1 not as expected")
assert.Equal(t, len(node2Allocations),
len(part.GetNode("node-2:1234").GetYunikornAllocations()), "allocations on
node-1 not as expected")
- node1AllocatedMemory :=
part.GetNode("node-1:1234").GetAllocatedResource().Resources[common.Memory]
- node2AllocatedMemory :=
part.GetNode("node-2:1234").GetAllocatedResource().Resources[common.Memory]
- node1AllocatedCPU :=
part.GetNode("node-1:1234").GetAllocatedResource().Resources[common.CPU]
- node2AllocatedCPU :=
part.GetNode("node-2:1234").GetAllocatedResource().Resources[common.CPU]
- assert.Equal(t, node1AllocatedMemory+node2AllocatedMemory,
resources.Quantity(120))
- assert.Equal(t, node1AllocatedCPU+node2AllocatedCPU,
resources.Quantity(12))
+ node1Allocated := part.GetNode("node-1:1234").GetAllocatedResource()
+ node2Allocated := part.GetNode("node-2:1234").GetAllocatedResource()
+ assert.Equal(t,
node1Allocated.Resources[common.Memory]+node2Allocated.Resources[common.Memory],
resources.Quantity(120))
+ assert.Equal(t,
node1Allocated.Resources[common.CPU]+node2Allocated.Resources[common.CPU],
resources.Quantity(12))
// verify queues
// - verify root queue
- rootQ = part.GetQueue("root")
if rootQ == nil {
t.Fatal("root queue not found on partition")
}
assert.Equal(t, rootQ.GetAllocatedResource().Resources[common.Memory],
resources.Quantity(120), "allocated memory on root queue not as expected")
assert.Equal(t, rootQ.GetAllocatedResource().Resources[common.CPU],
resources.Quantity(12), "allocated vcore on root queue not as expected")
// - verify root.a queue
- childQueues := rootQ.GetCopyOfChildren()
- queueA := childQueues["a"]
assert.Assert(t, queueA != nil, "root.a doesn't exist in partition")
assert.Equal(t, queueA.GetAllocatedResource().Resources[common.Memory],
resources.Quantity(120), "allocated memory on root.a queue not as expected")
assert.Equal(t, queueA.GetAllocatedResource().Resources[common.CPU],
resources.Quantity(12), "allocated vcore on root.a queue not as expected")
@@ -349,67 +209,19 @@ func TestSchedulerRecovery2Allocations(t *testing.T) {
// --------------------------------------------------
ms := &mockScheduler{}
defer ms.Stop()
-
- err := ms.Init(configData, false, false)
- assert.NilError(t, err, "RegisterResourceManager failed")
-
- // Register node, and add app
- err = ms.proxy.UpdateNode(&si.NodeRequest{
- Nodes: []*si.NodeInfo{
- {
- NodeID: "node-1:1234",
- Attributes: map[string]string{},
- SchedulableResource: &si.Resource{
- Resources: map[string]*si.Quantity{
- "memory": {Value: 100},
- "vcore": {Value: 20},
- },
- },
- Action: si.NodeInfo_CREATE,
- },
- },
- RmID: "rm:123",
- })
-
- assert.NilError(t, err, "NodeRequest failed")
- ms.mockRM.waitForAcceptedNode(t, "node-1:1234", 1000)
-
- err = ms.proxy.UpdateApplication(&si.ApplicationRequest{
- New: newAddAppRequest(map[string]string{appID1: "root.a"}),
- RmID: "rm:123",
- })
- assert.NilError(t, err, "ApplicationRequest failed")
- ms.mockRM.waitForAcceptedApplication(t, appID1, 1000)
+ doRecoverySetup(t, configData, ms, true, false,
[]string{"node-1:1234"}, true, []string{appID1}, nil)
// Verify app initial state
part := ms.scheduler.GetClusterContext().GetPartition("[rm:123]default")
var app01 *objects.Application
- app01, err = getApplication(part, appID1)
+ app01, err := getApplication(part, appID1)
assert.NilError(t, err, "app not found on partition")
assert.Equal(t, app01.CurrentState(), objects.New.String())
err = ms.proxy.UpdateAllocation(&si.AllocationRequest{
Allocations: []*si.Allocation{
- {
- AllocationKey: "alloc-1",
- ResourcePerAlloc: &si.Resource{
- Resources: map[string]*si.Quantity{
- "memory": {Value: 10},
- "vcore": {Value: 1},
- },
- },
- ApplicationID: appID1,
- },
- {
- AllocationKey: "alloc-2",
- ResourcePerAlloc: &si.Resource{
- Resources: map[string]*si.Quantity{
- "memory": {Value: 10},
- "vcore": {Value: 1},
- },
- },
- ApplicationID: appID1,
- },
+ createAllocation("alloc-1", "", appID1, 10, 1, "",
false),
+ createAllocation("alloc-2", "", appID1, 10, 1, "",
false),
},
RmID: "rm:123",
})
@@ -426,36 +238,7 @@ func TestSchedulerRecovery2Allocations(t *testing.T) {
// keep the existing mockRM
mockRM := ms.mockRM
ms.serviceContext.StopAll()
- // restart
- err = ms.Init(configData, false, false)
- assert.NilError(t, err, "2nd RegisterResourceManager failed")
-
- // Register nodes, and add apps
- err = ms.proxy.UpdateApplication(&si.ApplicationRequest{
- New: newAddAppRequest(map[string]string{appID1: "root.a"}),
- RmID: "rm:123",
- })
- assert.NilError(t, err, "ApplicationRequest failed")
- ms.mockRM.waitForAcceptedApplication(t, appID1, 1000)
-
- err = ms.proxy.UpdateNode(&si.NodeRequest{
- Nodes: []*si.NodeInfo{
- {
- NodeID: "node-1:1234",
- Attributes: map[string]string{},
- SchedulableResource: &si.Resource{
- Resources: map[string]*si.Quantity{
- "memory": {Value: 100},
- "vcore": {Value: 20},
- },
- },
- Action: si.NodeInfo_CREATE,
- },
- },
- RmID: "rm:123",
- })
- assert.NilError(t, err, "NodeRequest nodes and app for recovery failed")
- ms.mockRM.waitForAcceptedNode(t, "node-1:1234", 1000)
+ doRecoverySetup(t, configData, ms, true, false,
[]string{"node-1:1234"}, true, []string{appID1}, nil)
allocs := mockRM.nodeAllocations["node-1:1234"]
err = ms.proxy.UpdateAllocation(&si.AllocationRequest{Allocations:
allocs, RmID: "rm:123"})
assert.NilError(t, err, "failed to update allocations")
@@ -465,72 +248,19 @@ func TestSchedulerRecovery2Allocations(t *testing.T) {
assert.Equal(t, recoveredApp.CurrentState(), objects.Running.String())
}
-// test scheduler recovery when shim doesn't report existing application
+// TestSchedulerRecoveryWithoutAppInfo test scheduler recovery when shim
doesn't report existing application
// but only include existing allocations of this app.
-//
-//nolint:funlen
func TestSchedulerRecoveryWithoutAppInfo(t *testing.T) {
// Register RM
ms := &mockScheduler{}
defer ms.Stop()
- err := ms.Init(configData, false, false)
- assert.NilError(t, err, "RegisterResourceManager failed")
-
- // Register nodes, and add apps
- // here we only report back existing allocations, without registering
applications
- err = ms.proxy.UpdateNode(&si.NodeRequest{
- Nodes: []*si.NodeInfo{
- {
- NodeID: "node-1:1234",
- Attributes: map[string]string{},
- SchedulableResource: &si.Resource{
- Resources: map[string]*si.Quantity{
- "memory": {Value: 100},
- "vcore": {Value: 20},
- },
- },
- Action: si.NodeInfo_CREATE,
- },
- {
- NodeID: "node-2:1234",
- Attributes: map[string]string{},
- SchedulableResource: &si.Resource{
- Resources: map[string]*si.Quantity{
- "memory": {Value: 100},
- "vcore": {Value: 20},
- },
- },
- Action: si.NodeInfo_CREATE,
- },
- },
- RmID: "rm:123",
- })
- assert.NilError(t, err, "NodeRequest nodes and apps failed")
-
- // waiting for recovery
- ms.mockRM.waitForAcceptedNode(t, "node-1:1234", 1000)
- ms.mockRM.waitForAcceptedNode(t, "node-2:1234", 1000)
+ part, rootQ, queueA := doRecoverySetup(t, configData, ms, true, false,
[]string{"node-1:1234", "node-2:1234"}, false, []string{appID1}, nil)
+ assert.Equal(t, resources.Quantity(150),
queueA.GetMaxResource().Resources[common.Memory])
- part := ms.scheduler.GetClusterContext().GetPartition("[rm:123]default")
- err = ms.proxy.UpdateAllocation(&si.AllocationRequest{
+ err := ms.proxy.UpdateAllocation(&si.AllocationRequest{
Allocations: []*si.Allocation{
- {
- AllocationKey: "allocation-key-01",
- ApplicationID: "app-01",
- PartitionName: "default",
- NodeID: "node-1:1234",
- ResourcePerAlloc: &si.Resource{
- Resources: map[string]*si.Quantity{
- common.Memory: {
- Value: 1024,
- },
- common.CPU: {
- Value: 1,
- },
- },
- },
- },
+ createAllocation("allocation-key-01", "node-1:1234",
appID1, 1024, 1, "", false),
},
RmID: "rm:123",
})
@@ -543,51 +273,10 @@ func TestSchedulerRecoveryWithoutAppInfo(t *testing.T) {
assert.Equal(t,
part.GetNode("node-2:1234").GetAllocatedResource().Resources[common.Memory],
resources.Quantity(0))
- // register the node again, with application info attached
- err = ms.proxy.UpdateApplication(&si.ApplicationRequest{
- New: newAddAppRequest(map[string]string{"app-01": "root.a"}),
- RmID: "rm:123",
- })
- assert.NilError(t, err, "ApplicationRequest re-register nodes and app
failed")
- ms.mockRM.waitForAcceptedApplication(t, "app-01", 1000)
-
- err = ms.proxy.UpdateNode(&si.NodeRequest{
- Nodes: []*si.NodeInfo{
- {
- NodeID: "node-1:1234",
- Attributes: map[string]string{},
- SchedulableResource: &si.Resource{
- Resources: map[string]*si.Quantity{
- "memory": {Value: 100},
- "vcore": {Value: 20},
- },
- },
- Action: si.NodeInfo_CREATE,
- },
- },
- RmID: "rm:123",
- })
- assert.NilError(t, err, "NodeRequest re-register nodes and app failed")
- ms.mockRM.waitForAcceptedNode(t, "node-1:1234", 1000)
-
+ doRecoverySetup(t, configData, ms, false, false,
[]string{"node-1:1234"}, true, []string{appID1}, nil)
err = ms.proxy.UpdateAllocation(&si.AllocationRequest{
Allocations: []*si.Allocation{
- {
- AllocationKey: "allocation-key-01",
- ApplicationID: "app-01",
- PartitionName: "default",
- NodeID: "node-1:1234",
- ResourcePerAlloc: &si.Resource{
- Resources: map[string]*si.Quantity{
- common.Memory: {
- Value: 100,
- },
- common.CPU: {
- Value: 1,
- },
- },
- },
- },
+ createAllocation("allocation-key-01", "node-1:1234",
appID1, 100, 1, "", false),
},
RmID: "rm:123",
})
@@ -602,8 +291,6 @@ func TestSchedulerRecoveryWithoutAppInfo(t *testing.T) {
assert.Equal(t,
part.GetNode("node-2:1234").GetAllocatedResource().Resources[common.CPU],
resources.Quantity(0))
t.Log("verifying scheduling queues")
- rootQ := part.GetQueue("root")
- queueA := part.GetQueue("root.a")
assert.Equal(t, queueA.GetAllocatedResource().Resources[common.Memory],
resources.Quantity(100))
assert.Equal(t, queueA.GetAllocatedResource().Resources[common.CPU],
resources.Quantity(1))
assert.Equal(t, rootQ.GetAllocatedResource().Resources[common.Memory],
resources.Quantity(100))
@@ -612,69 +299,12 @@ func TestSchedulerRecoveryWithoutAppInfo(t *testing.T) {
// test scheduler recovery that only registers nodes and apps
func TestAppRecovery(t *testing.T) {
- serviceContext := entrypoint.StartAllServicesWithManualScheduler()
- defer serviceContext.StopAll()
- proxy := serviceContext.RMProxy
-
- BuildInfoMap := make(map[string]string)
- BuildInfoMap["k"] = "v"
-
- // Register RM
- mockRM := newMockRMCallbackHandler()
-
- _, err := proxy.RegisterResourceManager(
- &si.RegisterResourceManagerRequest{
- RmID: "rm:123",
- PolicyGroup: "policygroup",
- Version: "0.0.2",
- BuildInfo: BuildInfoMap,
- Config: configData,
- }, mockRM)
-
- assert.NilError(t, err, "RegisterResourceManager failed")
-
- // Register nodes, and add apps
- err = proxy.UpdateApplication(&si.ApplicationRequest{
- New: newAddAppRequest(map[string]string{appID1: "root.a"}),
- RmID: "rm:123",
- })
- assert.NilError(t, err, "ApplicationRequest nodes and apps failed")
- mockRM.waitForAcceptedApplication(t, appID1, 1000)
-
- err = proxy.UpdateNode(&si.NodeRequest{
- Nodes: []*si.NodeInfo{
- {
- NodeID: "node-1:1234",
- Attributes: map[string]string{},
- SchedulableResource: &si.Resource{
- Resources: map[string]*si.Quantity{
- "memory": {Value: 100},
- "vcore": {Value: 20},
- },
- },
- Action: si.NodeInfo_CREATE,
- },
- {
- NodeID: "node-2:1234",
- Attributes: map[string]string{},
- SchedulableResource: &si.Resource{
- Resources: map[string]*si.Quantity{
- "memory": {Value: 100},
- "vcore": {Value: 20},
- },
- },
- Action: si.NodeInfo_CREATE,
- },
- },
- RmID: "rm:123",
- })
- assert.NilError(t, err, "NodeRequest nodes and apps failed")
+ ms := &mockScheduler{}
+ defer ms.Stop()
- // waiting for recovery
- mockRM.waitForAcceptedNode(t, "node-1:1234", 1000)
- mockRM.waitForAcceptedNode(t, "node-2:1234", 1000)
+ part, _, _ := doRecoverySetup(t, configData, ms, true, false,
[]string{"node-1:1234", "node-2:1234"}, true, []string{appID1}, nil)
- app :=
serviceContext.Scheduler.GetClusterContext().GetApplication(appID1,
"[rm:123]default")
+ app := part.GetApplication(appID1)
assert.Assert(t, app != nil, "application not found after recovery")
assert.Equal(t, app.ApplicationID, appID1)
assert.Equal(t, app.GetQueuePath(), "root.a")
@@ -682,40 +312,12 @@ func TestAppRecovery(t *testing.T) {
// test scheduler recovery that only registers apps
func TestAppRecoveryAlone(t *testing.T) {
- serviceContext := entrypoint.StartAllServicesWithManualScheduler()
- defer serviceContext.StopAll()
- proxy := serviceContext.RMProxy
-
- BuildInfoMap := make(map[string]string)
- BuildInfoMap["k"] = "v"
-
- // Register RM
- mockRM := newMockRMCallbackHandler()
-
- _, err := proxy.RegisterResourceManager(
- &si.RegisterResourceManagerRequest{
- RmID: "rm:123",
- PolicyGroup: "policygroup",
- Version: "0.0.2",
- BuildInfo: BuildInfoMap,
- Config: configData,
- }, mockRM)
-
- assert.NilError(t, err, "RegisterResourceManager failed")
-
- // Register apps alone
- err = proxy.UpdateApplication(&si.ApplicationRequest{
- New: newAddAppRequest(map[string]string{appID1: "root.a",
appID2: "root.a"}),
- RmID: "rm:123",
- })
-
- assert.NilError(t, err, "ApplicationRequest app failed")
-
- mockRM.waitForAcceptedApplication(t, appID1, 1000)
- mockRM.waitForAcceptedApplication(t, appID2, 1000)
+ ms := &mockScheduler{}
+ defer ms.Stop()
+ part, _, _ := doRecoverySetup(t, configData, ms, true, false,
[]string{"node-1:1234", "node-2:1234"}, true, []string{appID1, appID2}, nil)
// verify app state
- apps :=
serviceContext.Scheduler.GetClusterContext().GetPartition("[rm:123]default").GetApplications()
+ apps := part.GetApplications()
found := 0
for _, app := range apps {
if app.ApplicationID == appID1 || app.ApplicationID == appID2 {
@@ -723,7 +325,6 @@ func TestAppRecoveryAlone(t *testing.T) {
found++
}
}
-
assert.Equal(t, found, 2, "did not find expected number of apps after
recovery")
}
@@ -736,8 +337,6 @@ func TestAppRecoveryAlone(t *testing.T) {
// new allocations. But during the recovery, when we recover existing
// allocations on node, we need to ensure the placement rule is still
// enforced.
-//
-//nolint:funlen
func TestAppRecoveryPlacement(t *testing.T) {
// Register RM
configData := `
@@ -757,91 +356,17 @@ partitions:
ms := &mockScheduler{}
defer ms.Stop()
- err := ms.Init(configData, false, false)
- assert.NilError(t, err, "RegisterResourceManager failed")
-
- // initially there is only 1 root queue exist
- part := ms.scheduler.GetClusterContext().GetPartition(ms.partitionName)
- rootQ := part.GetQueue("root")
- assert.Equal(t, len(rootQ.GetCopyOfChildren()), 0, "unexpected child
queue(s) found")
-
- // Register nodes, and add apps
- err = ms.proxy.UpdateNode(&si.NodeRequest{
- Nodes: []*si.NodeInfo{
- {
- NodeID: "node-1:1234",
- Attributes: map[string]string{},
- SchedulableResource: &si.Resource{
- Resources: map[string]*si.Quantity{
- "memory": {Value: 100},
- "vcore": {Value: 20},
- },
- },
- Action: si.NodeInfo_CREATE,
- },
- {
- NodeID: "node-2:1234",
- Attributes: map[string]string{},
- SchedulableResource: &si.Resource{
- Resources: map[string]*si.Quantity{
- "memory": {Value: 100},
- "vcore": {Value: 20},
- },
- },
- Action: si.NodeInfo_CREATE,
- },
- },
- RmID: "rm:123",
- })
-
- assert.NilError(t, err, "NodeRequest nodes and apps failed")
-
- err = ms.proxy.UpdateApplication(&si.ApplicationRequest{
- New: []*si.AddApplicationRequest{{
- ApplicationID: appID1,
- QueueName: "",
- PartitionName: "",
- Tags: map[string]string{"namespace":
"app-1-namespace"},
- Ugi: &si.UserGroupInformation{
- User: "test-user",
- },
- }},
- RmID: "rm:123",
- })
-
- assert.NilError(t, err, "ApplicationRequest nodes and apps failed")
-
- ms.mockRM.waitForAcceptedApplication(t, appID1, 1000)
- ms.mockRM.waitForAcceptedNode(t, "node-1:1234", 1000)
- ms.mockRM.waitForAcceptedNode(t, "node-2:1234", 1000)
+ part, rootQ, _ := doRecoverySetup(t, configData, ms, true, false,
[]string{"node-1:1234", "node-2:1234"}, true, []string{appID1},
map[string]string{"namespace": "app-1-namespace"})
// now the queue should have been created under root.app-1-namespace
assert.Equal(t, len(rootQ.GetCopyOfChildren()), 1)
appQueue := part.GetQueue("root.app-1-namespace")
assert.Assert(t, appQueue != nil, "application queue was not created")
- err = ms.proxy.UpdateAllocation(&si.AllocationRequest{
+ err := ms.proxy.UpdateAllocation(&si.AllocationRequest{
Allocations: []*si.Allocation{
- {
- AllocationKey: "alloc-1",
- ResourcePerAlloc: &si.Resource{
- Resources: map[string]*si.Quantity{
- "memory": {Value: 10},
- "vcore": {Value: 1},
- },
- },
- ApplicationID: appID1,
- },
- {
- AllocationKey: "alloc-2",
- ResourcePerAlloc: &si.Resource{
- Resources: map[string]*si.Quantity{
- "memory": {Value: 10},
- "vcore": {Value: 1},
- },
- },
- ApplicationID: appID1,
- },
+ createAllocation("alloc-1", "", appID1, 10, 1, "",
false),
+ createAllocation("alloc-2", "", appID1, 10, 1, "",
false),
},
RmID: "rm:123",
})
@@ -900,68 +425,7 @@ partitions:
ms.serviceContext.StopAll()
// restart
- err = ms.Init(configData, false, false)
- assert.NilError(t, err, "2nd RegisterResourceManager failed")
- part = ms.scheduler.GetClusterContext().GetPartition(ms.partitionName)
- rootQ = part.GetQueue("root")
- assert.Equal(t, len(rootQ.GetCopyOfChildren()), 0)
-
- // first recover apps
- err = ms.proxy.UpdateApplication(&si.ApplicationRequest{
- New: []*si.AddApplicationRequest{
- {
- ApplicationID: appID1,
- QueueName: "",
- PartitionName: "",
- Tags: map[string]string{"namespace":
"app-1-namespace"},
- Ugi: &si.UserGroupInformation{
- User: "test-user",
- },
- },
- },
- RmID: "rm:123",
- })
-
- assert.NilError(t, err, "ApplicationRequest add app failed")
-
- // waiting for recovery
- ms.mockRM.waitForAcceptedApplication(t, appID1, 1000)
-
- // recover nodes
- err = ms.proxy.UpdateNode(&si.NodeRequest{
- Nodes: []*si.NodeInfo{
- {
- NodeID: "node-1:1234",
- Attributes: map[string]string{},
- SchedulableResource: &si.Resource{
- Resources: map[string]*si.Quantity{
- "memory": {Value: 100},
- "vcore": {Value: 20},
- },
- },
- Action: si.NodeInfo_CREATE,
- },
- {
- NodeID: "node-2:1234",
- Attributes: map[string]string{},
- SchedulableResource: &si.Resource{
- Resources: map[string]*si.Quantity{
- "memory": {Value: 100},
- "vcore": {Value: 20},
- },
- },
- Action: si.NodeInfo_CREATE,
- },
- },
- RmID: "rm:123",
- })
-
- assert.NilError(t, err, "NodeRequest nodes failed")
-
- // waiting for recovery
- ms.mockRM.waitForAcceptedNode(t, "node-1:1234", 1000)
- ms.mockRM.waitForAcceptedNode(t, "node-2:1234", 1000)
- ms.mockRM.waitForAcceptedApplication(t, appID1, 1000)
+ part, rootQ, _ = doRecoverySetup(t, configData, ms, true, false,
[]string{"node-1:1234", "node-2:1234"}, true, []string{appID1},
map[string]string{"namespace": "app-1-namespace"})
err = registerAllocations(part, toRecover["node-1:1234"])
assert.NilError(t, err)
@@ -974,26 +438,10 @@ partitions:
assert.Assert(t, appQueue != nil, "application queue was not created
after recovery")
}
-func TestPlaceholderRecovery(t *testing.T) { //nolint:funlen
+func TestPlaceholderRecovery(t *testing.T) {
// create an existing allocation
existingAllocations := make([]*si.Allocation, 1)
- existingAllocations[0] = &si.Allocation{
- AllocationKey: "ph-alloc-1",
- NodeID: "node-1:1234",
- ApplicationID: appID1,
- TaskGroupName: "tg-1",
- ResourcePerAlloc: &si.Resource{
- Resources: map[string]*si.Quantity{
- "memory": {
- Value: 10,
- },
- "vcore": {
- Value: 1,
- },
- },
- },
- Placeholder: true,
- }
+ existingAllocations[0] = createAllocation("ph-alloc-1", "node-1:1234",
appID1, 10, 1, "tg-1", true)
config := `partitions:
- name: default
@@ -1001,42 +449,14 @@ func TestPlaceholderRecovery(t *testing.T) {
//nolint:funlen
- name: root
submitacl: "*"
queues:
- - name: default`
+ - name: a`
ms := &mockScheduler{}
defer ms.Stop()
- err := ms.Init(config, true, false)
- assert.NilError(t, err, "RegisterResourceManager failed")
-
- // Add application
- err = ms.proxy.UpdateApplication(&si.ApplicationRequest{
- New: newAddAppRequest(map[string]string{appID1:
"root.default"}),
- RmID: "rm:123",
- })
- assert.NilError(t, err, "ApplicationRequest failed")
- ms.mockRM.waitForAcceptedApplication(t, appID1, 1000)
- // Add node
- err = ms.proxy.UpdateNode(&si.NodeRequest{
- Nodes: []*si.NodeInfo{
- {
- NodeID: "node-1:1234",
- Attributes: map[string]string{},
- SchedulableResource: &si.Resource{
- Resources: map[string]*si.Quantity{
- "memory": {Value: 100},
- "vcore": {Value: 20},
- },
- },
- Action: si.NodeInfo_CREATE,
- },
- },
- RmID: "rm:123",
- })
- assert.NilError(t, err, "NodeRequest nodes and app for recovery failed")
- ms.mockRM.waitForAcceptedNode(t, "node-1:1234", 1000)
+ doRecoverySetup(t, config, ms, true, true, []string{"node-1:1234"},
true, []string{appID1}, nil)
// Add existing allocations
- err = ms.proxy.UpdateAllocation(&si.AllocationRequest{
+ err := ms.proxy.UpdateAllocation(&si.AllocationRequest{
Allocations: existingAllocations,
RmID: "rm:123",
})
@@ -1046,18 +466,7 @@ func TestPlaceholderRecovery(t *testing.T) {
//nolint:funlen
// Add a new placeholder ask with a different task group
err = ms.proxy.UpdateAllocation(&si.AllocationRequest{
Allocations: []*si.Allocation{
- {
- AllocationKey: "ph-alloc-2",
- ResourcePerAlloc: &si.Resource{
- Resources: map[string]*si.Quantity{
- "memory": {Value: 10},
- "vcore": {Value: 1},
- },
- },
- ApplicationID: appID1,
- TaskGroupName: "tg-2",
- Placeholder: true,
- },
+ createAllocation("ph-alloc-2", "", appID1, 10, 1,
"tg-2", true),
},
RmID: "rm:123",
})
@@ -1067,28 +476,8 @@ func TestPlaceholderRecovery(t *testing.T) {
//nolint:funlen
// Add two real asks
err = ms.proxy.UpdateAllocation(&si.AllocationRequest{
Allocations: []*si.Allocation{
- {
- AllocationKey: "real-alloc-1",
- ResourcePerAlloc: &si.Resource{
- Resources: map[string]*si.Quantity{
- "memory": {Value: 10},
- "vcore": {Value: 1},
- },
- },
- ApplicationID: appID1,
- TaskGroupName: "tg-1",
- },
- {
- AllocationKey: "real-alloc-2",
- ResourcePerAlloc: &si.Resource{
- Resources: map[string]*si.Quantity{
- "memory": {Value: 10},
- "vcore": {Value: 1},
- },
- },
- ApplicationID: appID1,
- TaskGroupName: "tg-2",
- },
+ createAllocation("real-alloc-1", "", appID1, 10, 1,
"tg-1", false),
+ createAllocation("real-alloc-2", "", appID1, 10, 1,
"tg-2", false),
},
RmID: "rm:123",
})
@@ -1099,18 +488,8 @@ func TestPlaceholderRecovery(t *testing.T) {
//nolint:funlen
err = ms.proxy.UpdateAllocation(&si.AllocationRequest{
Releases: &si.AllocationReleasesRequest{
AllocationsToRelease: []*si.AllocationRelease{
- {
- ApplicationID: appID1,
- PartitionName: "default",
- AllocationKey: "ph-alloc-1",
- TerminationType:
si.TerminationType_PLACEHOLDER_REPLACED,
- },
- {
- ApplicationID: appID1,
- PartitionName: "default",
- AllocationKey: "ph-alloc-2",
- TerminationType:
si.TerminationType_PLACEHOLDER_REPLACED,
- },
+ createAllocationRelease(appID1, "default",
"ph-alloc-1", si.TerminationType_PLACEHOLDER_REPLACED),
+ createAllocationRelease(appID1, "default",
"ph-alloc-2", si.TerminationType_PLACEHOLDER_REPLACED),
},
},
RmID: "rm:123",
@@ -1121,18 +500,8 @@ func TestPlaceholderRecovery(t *testing.T) {
//nolint:funlen
err = ms.proxy.UpdateAllocation(&si.AllocationRequest{
Releases: &si.AllocationReleasesRequest{
AllocationsToRelease: []*si.AllocationRelease{
- {
- ApplicationID: appID1,
- PartitionName: "default",
- AllocationKey: "real-alloc-1",
- TerminationType:
si.TerminationType_STOPPED_BY_RM,
- },
- {
- ApplicationID: appID1,
- PartitionName: "default",
- AllocationKey: "real-alloc-2",
- TerminationType:
si.TerminationType_STOPPED_BY_RM,
- },
+ createAllocationRelease(appID1, "default",
"real-alloc-1", si.TerminationType_STOPPED_BY_RM),
+ createAllocationRelease(appID1, "default",
"real-alloc-2", si.TerminationType_STOPPED_BY_RM),
},
},
RmID: "rm:123",
@@ -1142,6 +511,106 @@ func TestPlaceholderRecovery(t *testing.T) {
//nolint:funlen
ms.mockRM.waitForApplicationState(t, appID1, "Completing", 1000)
}
+// TestSchedulerRecoveryQuotaPreemption Test scheduler recovery with quota
preemption when shim doesn't report existing application
+// but only include existing allocations of this app.
+func TestSchedulerRecoveryQuotaPreemption(t *testing.T) {
+ config := `
+partitions:
+ - name: default
+ preemption:
+ ENABLED_STR
+ queues:
+ - name: root
+ properties:
+ PROPERTIES_PARENT_STR
+ submitacl: "*"
+ queues:
+ - name: a
+ properties:
+ PROPERTIES_STR
+ resources:
+ guaranteed:
+ memory: 5
+ vcore: 2
+ max:
+ memory: 5
+ vcore: 2
+`
+ configPreemptionDefault := strings.ReplaceAll(config, "ENABLED_STR", "")
+ configPreemptionDefault = strings.ReplaceAll(configPreemptionDefault,
"PROPERTIES_PARENT_STR", "")
+ configPreemptionDefault = strings.ReplaceAll(configPreemptionDefault,
"PROPERTIES_STR", configs.QuotaPreemptionDelay+": 15m")
+
+ configPreemptionDisabled := strings.ReplaceAll(config, "ENABLED_STR",
"quotapreemptionenabled: false")
+ configPreemptionDisabled = strings.ReplaceAll(configPreemptionDisabled,
"PROPERTIES_PARENT_STR", "")
+ configPreemptionDisabled = strings.ReplaceAll(configPreemptionDisabled,
"PROPERTIES_STR", configs.QuotaPreemptionDelay+": 15m")
+ configPreemptionEnabled := strings.ReplaceAll(config, "ENABLED_STR",
"quotapreemptionenabled: true")
+ configPreemptionEnabled = strings.ReplaceAll(configPreemptionEnabled,
"PROPERTIES_PARENT_STR", "")
+ configPreemptionEnabled = strings.ReplaceAll(configPreemptionEnabled,
"PROPERTIES_STR", "")
+ configPreemptionEnabledAndDelaySet := strings.ReplaceAll(config,
"ENABLED_STR", "quotapreemptionenabled: true")
+ configPreemptionEnabledAndDelaySet =
strings.ReplaceAll(configPreemptionEnabledAndDelaySet, "PROPERTIES_PARENT_STR",
"")
+ configPreemptionEnabledAndDelaySet =
strings.ReplaceAll(configPreemptionEnabledAndDelaySet, "PROPERTIES_STR",
configs.QuotaPreemptionDelay+": 15m")
+ configPreemptionEnabledAndDelaySetAtParent :=
strings.ReplaceAll(config, "ENABLED_STR", "quotapreemptionenabled: true")
+ configPreemptionEnabledAndDelaySetAtParent =
strings.ReplaceAll(configPreemptionEnabledAndDelaySetAtParent,
"PROPERTIES_PARENT_STR", configs.QuotaPreemptionDelay+": 15m")
+ configPreemptionEnabledAndDelaySetAtParent =
strings.ReplaceAll(configPreemptionEnabledAndDelaySetAtParent,
"PROPERTIES_STR", "")
+
+ tests := []struct {
+ name string
+ config string
+ allocated *resources.Resource
+ addedAllocations int
+ }{
+ {"preemption not configured explicitly at partition level",
configPreemptionDefault,
resources.NewResourceFromMap(map[string]resources.Quantity{common.Memory: 6}),
1},
+ {"preemption disabled at partition level",
configPreemptionDisabled,
resources.NewResourceFromMap(map[string]resources.Quantity{common.Memory: 6}),
1},
+ {"preemption enabled at partition level, but delay not set at
queue level", configPreemptionEnabled,
resources.NewResourceFromMap(map[string]resources.Quantity{common.Memory: 6}),
1},
+ {"preemption enabled, delay set but usage is lower than max
resources", configPreemptionEnabledAndDelaySet,
resources.NewResourceFromMap(map[string]resources.Quantity{common.Memory: 4}),
1},
+ {"preemption enabled, delay set, usage is higher than max
resources", configPreemptionEnabledAndDelaySet,
resources.NewResourceFromMap(map[string]resources.Quantity{common.Memory: 6}),
0},
+ {"preemption enabled, delay set at parent queue but inherited
and usage is higher than max resources in leaf queue.",
configPreemptionEnabledAndDelaySetAtParent,
resources.NewResourceFromMap(map[string]resources.Quantity{common.Memory: 6}),
0},
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ // Register RM
+ ms := &mockScheduler{}
+ part, _, _ := doRecoverySetup(t, tt.config, ms, true,
false, []string{"node-1:1234"}, true, []string{appID1}, nil)
+
+ err := ms.proxy.UpdateAllocation(&si.AllocationRequest{
+ Allocations: []*si.Allocation{
+ createAllocation("allocation-key-01",
"node-1:1234", appID1, 1024, 1, "", false),
+ },
+ RmID: "rm:123",
+ })
+
+ assert.NilError(t, err)
+
+ // verify partition resources
+ assert.Equal(t, part.GetTotalNodeCount(), 1)
+ assert.Equal(t, part.GetTotalAllocationCount(), 0)
+ assert.Equal(t,
part.GetNode("node-1:1234").GetAllocatedResource().Resources[common.Memory],
+ resources.Quantity(0))
+ assert.Equal(t,
part.GetNode("node-1:1234").GetAllocatedResource().Resources[common.CPU],
+ resources.Quantity(0))
+
+ ms.serviceContext.StopAll()
+
+ // restart
+ _, _, queueA := doRecoverySetup(t, tt.config, ms, true,
false, []string{"node-1:1234"}, true, []string{appID1}, nil)
+
+ // Set allocated resource to exceed max quota
+ queueA.IncAllocatedResource(tt.allocated, false)
+
+ err = ms.proxy.UpdateAllocation(&si.AllocationRequest{
+ Allocations: []*si.Allocation{
+ createAllocation("allocation-key-02",
"node-1:1234", appID1, 1, 4, "", false),
+ },
+ RmID: "rm:123",
+ })
+ assert.NilError(t, err)
+ ms.mockRM.waitForAllocations(t, tt.addedAllocations,
1000)
+ ms.Stop()
+ })
+ }
+}
+
func registerAllocations(partition *scheduler.PartitionContext, allocs
[]*si.Allocation) error {
for _, alloc := range allocs {
_, allocCreated, err :=
partition.UpdateAllocation(objects.NewAllocationFromSI(alloc))
@@ -1154,3 +623,106 @@ func registerAllocations(partition
*scheduler.PartitionContext, allocs []*si.All
}
return nil
}
+
+func doRecoverySetup(t *testing.T, config string, ms *mockScheduler, init
bool, autoSchedule bool, nodes []string, addApp bool, apps []string, tags
map[string]string) (*scheduler.PartitionContext, *objects.Queue,
*objects.Queue) {
+ var part *scheduler.PartitionContext
+ var rootQ *objects.Queue
+ var queue *objects.Queue
+ if init {
+ err := ms.Init(config, autoSchedule, false)
+ assert.NilError(t, err, "RegisterResourceManager failed")
+
+ // Check queues of scheduler.GetClusterContext() and scheduler.
+ part =
ms.scheduler.GetClusterContext().GetPartition("[rm:123]default")
+ assert.Assert(t, nil == part.GetTotalPartitionResource())
+
+ // Check scheduling queue root
+ rootQ = part.GetQueue("root")
+ if rootQ == nil {
+ t.Fatal("root queue not found on partition")
+ }
+ assert.Assert(t, nil == rootQ.GetMaxResource())
+
+ if tags != nil {
+ assert.Equal(t, len(rootQ.GetCopyOfChildren()), 0,
"unexpected child queue(s) found")
+ }
+ // Check scheduling queue a
+ queue = part.GetQueue("root.a")
+ }
+
+ if addApp {
+ appsMap := make(map[string]string)
+ for _, app := range apps {
+ appsMap[app] = "root.a"
+ }
+ err := ms.proxy.UpdateApplication(&si.ApplicationRequest{
+ New: newAddAppRequestWithTags(appsMap, tags),
+ RmID: "rm:123",
+ })
+ assert.NilError(t, err, "ApplicationRequest failed")
+ for _, app := range apps {
+ ms.mockRM.waitForAcceptedApplication(t, app, 1000)
+ assert.Equal(t, ms.getApplication(app).CurrentState(),
objects.New.String())
+ }
+ }
+
+ var nodesArr []*si.NodeInfo
+ for _, node := range nodes {
+ nodesArr = append(nodesArr, createNodeInfo(node))
+ }
+
+ // Register node, and add app
+ err := ms.proxy.UpdateNode(&si.NodeRequest{
+ Nodes: nodesArr,
+ RmID: "rm:123",
+ })
+ assert.NilError(t, err, "NodeRequest failed")
+ for _, node := range nodesArr {
+ ms.mockRM.waitForAcceptedNode(t, node.NodeID, 1000)
+ }
+ return part, rootQ, queue
+}
+
+func createNodeInfo(node string) *si.NodeInfo {
+ return &si.NodeInfo{
+ NodeID: node,
+ Attributes: map[string]string{},
+ SchedulableResource: &si.Resource{
+ Resources: map[string]*si.Quantity{
+ "memory": {Value: 100},
+ "vcore": {Value: 20},
+ },
+ },
+ Action: si.NodeInfo_CREATE,
+ }
+}
+
+func createAllocation(name string, nodeId string, app string, mem int, cpu
int, tg string, ph bool) *si.Allocation {
+ return &si.Allocation{
+ AllocationKey: name,
+ ApplicationID: app,
+ PartitionName: "default",
+ NodeID: nodeId,
+ ResourcePerAlloc: &si.Resource{
+ Resources: map[string]*si.Quantity{
+ common.Memory: {
+ Value: int64(mem),
+ },
+ common.CPU: {
+ Value: int64(cpu),
+ },
+ },
+ },
+ TaskGroupName: tg,
+ Placeholder: ph,
+ }
+}
+
+func createAllocationRelease(app string, part string, allocKey string,
terminationType si.TerminationType) *si.AllocationRelease {
+ return &si.AllocationRelease{
+ ApplicationID: app,
+ PartitionName: part,
+ AllocationKey: allocKey,
+ TerminationType: terminationType,
+ }
+}
diff --git a/pkg/scheduler/tests/utilities_test.go
b/pkg/scheduler/tests/utilities_test.go
index f6904c25..711f4036 100644
--- a/pkg/scheduler/tests/utilities_test.go
+++ b/pkg/scheduler/tests/utilities_test.go
@@ -59,6 +59,11 @@ func caller() string {
return funcName
}
+func waitForQueuesPendingResource(t *testing.T, queue []*objects.Queue, memory
resources.Quantity, timeoutMs int) {
+ for _, q := range queue {
+ waitForPendingQueueResource(t, q, memory, timeoutMs)
+ }
+}
func waitForPendingQueueResource(t *testing.T, queue *objects.Queue, memory
resources.Quantity, timeoutMs int) {
err := common.WaitForCondition(10*time.Millisecond,
time.Duration(timeoutMs)*time.Millisecond, func() bool {
return queue.GetPendingResource().Resources[siCommon.Memory] ==
memory
@@ -148,16 +153,25 @@ func getApplication(pc *scheduler.PartitionContext, appID
string) (*objects.Appl
}
func newAddAppRequest(apps map[string]string) []*si.AddApplicationRequest {
+ return newAddAppRequestWithTags(apps, nil)
+}
+
+func newAddAppRequestWithTags(apps map[string]string, tags map[string]string)
[]*si.AddApplicationRequest {
var requests []*si.AddApplicationRequest
for app, queue := range apps {
+ appQ := queue
+ if tags != nil || len(tags) > 0 {
+ appQ = ""
+ }
request := si.AddApplicationRequest{
ApplicationID: app,
- QueueName: queue,
+ QueueName: appQ,
PartitionName: "",
Ugi: &si.UserGroupInformation{
User: "testuser",
Groups: []string{"testgroup"},
},
+ Tags: tags,
}
requests = append(requests, &request)
}
diff --git a/pkg/scheduler/utilities_test.go b/pkg/scheduler/utilities_test.go
index be557b11..3be573cf 100644
--- a/pkg/scheduler/utilities_test.go
+++ b/pkg/scheduler/utilities_test.go
@@ -480,6 +480,43 @@ func newPlacementPartition() (*PartitionContext, error) {
return newPartitionContext(conf, rmID, nil, false)
}
+func newQuotaPreemptionConfiguredPartition() (*PartitionContext, error) {
+ var True = true
+ conf := configs.PartitionConfig{
+ Name: "test",
+ Queues: []configs.QueueConfig{
+ {
+ Name: "root",
+ Parent: true,
+ SubmitACL: "*",
+ Queues: []configs.QueueConfig{
+ {
+ Name: "leaf",
+ Parent: false,
+ Queues: nil,
+ Resources: configs.Resources{
+ Max: map[string]string{
+ "memory": "10",
+ "vcore": "10",
+ },
+ },
+ Properties: map[string]string{
+
configs.QuotaPreemptionDelay: "1s",
+ },
+ },
+ },
+ },
+ },
+ PlacementRules: nil,
+ Limits: nil,
+ NodeSortPolicy: configs.NodeSortingPolicy{},
+ Preemption: configs.PartitionPreemptionConfig{
+ QuotaPreemptionEnabled: &True,
+ },
+ }
+ return newPartitionContext(conf, rmID, nil, false)
+}
+
func newApplication(appID, partition, queueName string) *objects.Application {
return newApplicationTags(appID, partition, queueName, nil)
}
@@ -683,6 +720,25 @@ func createQueuesNodes(t *testing.T) *PartitionContext {
return partition
}
+// partition with an expected basic queue hierarchy
+// root -> parent -> leaf1
+//
+// -> leaf2
+//
+// and 2 nodes: node-1 & node-2
+func createQuotaPreemptionQueuesNodes(t *testing.T) *PartitionContext {
+ partition, err := newQuotaPreemptionConfiguredPartition()
+ assert.NilError(t, err, "test partition create failed with error")
+ var res *resources.Resource
+ res, err = resources.NewResourceFromConf(map[string]string{"vcore":
"12"})
+ assert.NilError(t, err, "failed to create basic resource")
+ err = partition.AddNode(newNodeMaxResource("node-1", res))
+ assert.NilError(t, err, "test node1 add failed unexpected")
+ err = partition.AddNode(newNodeMaxResource("node-2", res))
+ assert.NilError(t, err, "test node2 add failed unexpected")
+ return partition
+}
+
// partition with a sibling relationship for testing preemption
// root -> parent -> {leaf1,leaf2}
//
@@ -703,6 +759,17 @@ func createPreemptionQueuesNodes(t *testing.T)
*PartitionContext {
return partition
}
+func createLeafQueueConfig(max, properties map[string]string)
configs.QueueConfig {
+ return configs.QueueConfig{
+ Name: "leaf",
+ Parent: false,
+ Queues: nil,
+ Resources: configs.Resources{
+ Max: max,
+ },
+ Properties: properties,
+ }
+}
func getTestUserGroup() security.UserGroup {
return security.UserGroup{User: "testuser", Groups:
[]string{"testgroup"}}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]