This is an automated email from the ASF dual-hosted git repository.
mani pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git
The following commit(s) were added to refs/heads/master by this push:
new ca733441 [YUNIKORN-2107] Allow preemption to be disabled globally
(#690)
ca733441 is described below
commit ca733441da3b90c6c69388c83b2239dc8b20a5f6
Author: Craig Condit <[email protected]>
AuthorDate: Mon Nov 6 17:14:14 2023 +0530
[YUNIKORN-2107] Allow preemption to be disabled globally (#690)
Closes: #690
Signed-off-by: Manikandan R <[email protected]>
---
pkg/common/configs/config.go | 6 +++---
pkg/common/configs/config_test.go | 33 +++++++++++++++++++++++++------
pkg/scheduler/objects/application.go | 6 +++---
pkg/scheduler/objects/application_test.go | 26 ++++++++++++------------
pkg/scheduler/objects/queue.go | 6 +++---
pkg/scheduler/partition.go | 16 ++++++++++++++-
pkg/scheduler/partition_test.go | 24 ++++++++++++++++++++++
7 files changed, 88 insertions(+), 29 deletions(-)
diff --git a/pkg/common/configs/config.go b/pkg/common/configs/config.go
index 8f34a044..38930650 100644
--- a/pkg/common/configs/config.go
+++ b/pkg/common/configs/config.go
@@ -50,14 +50,14 @@ type PartitionConfig struct {
Queues []QueueConfig
PlacementRules []PlacementRule `yaml:",omitempty"
json:",omitempty"`
Limits []Limit `yaml:",omitempty"
json:",omitempty"`
- Preemption PartitionPreemptionConfig `yaml:",omitempty"
json:",omitempty"` // deprecated
+ Preemption PartitionPreemptionConfig `yaml:",omitempty"
json:",omitempty"`
NodeSortPolicy NodeSortingPolicy `yaml:",omitempty"
json:",omitempty"`
StateDumpFilePath string `yaml:",omitempty"
json:",omitempty"`
}
-// deprecated
+// The partition preemption configuration
type PartitionPreemptionConfig struct {
- Enabled bool
+ Enabled *bool `yaml:",omitempty" json:",omitempty"`
}
// The queue object for each queue:
diff --git a/pkg/common/configs/config_test.go
b/pkg/common/configs/config_test.go
index 3be785e4..df1fb08c 100644
--- a/pkg/common/configs/config_test.go
+++ b/pkg/common/configs/config_test.go
@@ -610,6 +610,15 @@ partitions:
func TestPartitionPreemptionParameter(t *testing.T) {
data := `
+partitions:
+ - name: default
+ queues:
+ - name: root
+ - name: "partition-0"
+ queues:
+ - name: root
+`
+ dataEnabled := `
partitions:
- name: default
queues:
@@ -619,18 +628,30 @@ partitions:
- name: "partition-0"
queues:
- name: root
+`
+ dataDisabled := `
+partitions:
+ - name: default
+ queues:
+ - name: root
+ preemption:
+ enabled: false
+ - name: "partition-0"
+ queues:
+ - name: root
`
// validate the config and check after the update
conf, err := CreateConfig(data)
assert.NilError(t, err, "should expect no error")
+ assert.Assert(t, conf.Partitions[0].Preemption.Enabled == nil, "default
should be nil")
- if !conf.Partitions[0].Preemption.Enabled {
- t.Error("default partition's preemption should be enabled.")
- }
+ conf, err = CreateConfig(dataEnabled)
+ assert.NilError(t, err, "should expect no error")
+ assert.Assert(t, *conf.Partitions[0].Preemption.Enabled, "preemption
should be enabled")
- if conf.Partitions[1].Preemption.Enabled {
- t.Error("partition-0's preemption should NOT be enabled by
default")
- }
+ conf, err = CreateConfig(dataDisabled)
+ assert.NilError(t, err, "should expect no error")
+ assert.Assert(t, !*conf.Partitions[0].Preemption.Enabled, "preemption
should be disabled")
}
func TestParseRule(t *testing.T) {
diff --git a/pkg/scheduler/objects/application.go
b/pkg/scheduler/objects/application.go
index d301790b..90b909d8 100644
--- a/pkg/scheduler/objects/application.go
+++ b/pkg/scheduler/objects/application.go
@@ -944,7 +944,7 @@ func (sa *Application) canReplace(request *AllocationAsk)
bool {
}
// tryAllocate will perform a regular allocation of a pending request,
includes placeholders.
-func (sa *Application) tryAllocate(headRoom *resources.Resource,
preemptionDelay time.Duration, preemptAttemptsRemaining *int, nodeIterator
func() NodeIterator, fullNodeIterator func() NodeIterator, getNodeFn
func(string) *Node) *Allocation {
+func (sa *Application) tryAllocate(headRoom *resources.Resource,
allowPreemption bool, preemptionDelay time.Duration, preemptAttemptsRemaining
*int, nodeIterator func() NodeIterator, fullNodeIterator func() NodeIterator,
getNodeFn func(string) *Node) *Allocation {
sa.Lock()
defer sa.Unlock()
if sa.sortedRequests == nil {
@@ -971,7 +971,7 @@ func (sa *Application) tryAllocate(headRoom
*resources.Resource, preemptionDelay
// resource must fit in headroom otherwise skip the request
(unless preemption could help)
if !headRoom.FitInMaxUndef(request.GetAllocatedResource()) {
// attempt preemption
- if *preemptAttemptsRemaining > 0 {
+ if allowPreemption && *preemptAttemptsRemaining > 0 {
*preemptAttemptsRemaining--
fullIterator := fullNodeIterator()
if fullIterator != nil {
@@ -1034,7 +1034,7 @@ func (sa *Application) tryAllocate(headRoom
*resources.Resource, preemptionDelay
}
// no nodes qualify, attempt preemption
- if *preemptAttemptsRemaining > 0 {
+ if allowPreemption && *preemptAttemptsRemaining > 0 {
*preemptAttemptsRemaining--
fullIterator := fullNodeIterator()
if fullIterator != nil {
diff --git a/pkg/scheduler/objects/application_test.go
b/pkg/scheduler/objects/application_test.go
index 0edb57ec..8176f7ad 100644
--- a/pkg/scheduler/objects/application_test.go
+++ b/pkg/scheduler/objects/application_test.go
@@ -1778,7 +1778,7 @@ func TestTryAllocateNoRequests(t *testing.T) {
app := newApplication(appID1, "default", "root.unknown")
preemptionAttemptsRemaining := 0
- alloc := app.tryAllocate(node.GetAvailableResource(), 30*time.Second,
&preemptionAttemptsRemaining, iterator, iterator, getNode)
+ alloc := app.tryAllocate(node.GetAvailableResource(), true,
30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, getNode)
assert.Check(t, alloc == nil, "unexpected alloc")
}
@@ -1803,7 +1803,7 @@ func TestTryAllocateFit(t *testing.T) {
assert.NilError(t, err)
preemptionAttemptsRemaining := 0
- alloc := app.tryAllocate(node.GetAvailableResource(), 30*time.Second,
&preemptionAttemptsRemaining, iterator, iterator, getNode)
+ alloc := app.tryAllocate(node.GetAvailableResource(), true,
30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, getNode)
assert.Assert(t, alloc != nil, "alloc expected")
assert.Equal(t, "node1", alloc.GetNodeID(), "wrong node")
}
@@ -1845,19 +1845,19 @@ func TestTryAllocatePreemptQueue(t *testing.T) {
preemptionAttemptsRemaining := 10
- alloc1 :=
app1.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first":
10}), 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator,
getNode)
+ alloc1 :=
app1.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first":
10}), true, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator,
getNode)
assert.Assert(t, alloc1 != nil, "alloc1 expected")
- alloc2 :=
app1.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first":
5}), 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, getNode)
+ alloc2 :=
app1.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first":
5}), true, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator,
getNode)
assert.Assert(t, alloc2 != nil, "alloc2 expected")
// on first attempt, not enough time has passed
- alloc3 :=
app2.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first":
0}), 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, getNode)
+ alloc3 :=
app2.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first":
0}), true, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator,
getNode)
assert.Assert(t, alloc3 == nil, "alloc3 not expected")
assert.Assert(t, !alloc2.IsPreempted(), "alloc2 should not have been
preempted")
// pass the time and try again
ask3.createTime = ask3.createTime.Add(-30 * time.Second)
- alloc3 =
app2.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first":
0}), 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, getNode)
+ alloc3 =
app2.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first":
0}), true, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator,
getNode)
assert.Assert(t, alloc3 == nil, "alloc3 not expected")
assert.Assert(t, alloc2.IsPreempted(), "alloc2 should have been
preempted")
}
@@ -1913,20 +1913,20 @@ func TestTryAllocatePreemptNode(t *testing.T) {
preemptionAttemptsRemaining := 10
// consume capacity with 'unlimited' app
- alloc00 :=
app0.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first":
40}), 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator,
getNode)
+ alloc00 :=
app0.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first":
40}), true, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator,
getNode)
assert.Assert(t, alloc00 != nil, "alloc00 expected")
- alloc01 :=
app0.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first":
39}), 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator,
getNode)
+ alloc01 :=
app0.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first":
39}), true, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator,
getNode)
assert.Assert(t, alloc01 != nil, "alloc01 expected")
// consume remainder of space but not quota
- alloc1 :=
app1.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first":
28}), 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator,
getNode)
+ alloc1 :=
app1.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first":
28}), true, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator,
getNode)
assert.Assert(t, alloc1 != nil, "alloc1 expected")
- alloc2 :=
app1.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first":
23}), 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator,
getNode)
+ alloc2 :=
app1.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first":
23}), true, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator,
getNode)
assert.Assert(t, alloc2 != nil, "alloc2 expected")
// on first attempt, should see a reservation since we're after the
reservation timeout
ask3.createTime = ask3.createTime.Add(-10 * time.Second)
- alloc3 :=
app2.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first":
18}), 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator,
getNode)
+ alloc3 :=
app2.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first":
18}), true, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator,
getNode)
assert.Assert(t, alloc3 != nil, "alloc3 expected")
assert.Equal(t, "node1", alloc3.GetNodeID(), "wrong node assignment")
assert.Equal(t, Reserved, alloc3.GetResult(), "expected reservation")
@@ -1936,7 +1936,7 @@ func TestTryAllocatePreemptNode(t *testing.T) {
// pass the time and try again
ask3.createTime = ask3.createTime.Add(-30 * time.Second)
- alloc3 =
app2.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first":
18}), 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator,
getNode)
+ alloc3 =
app2.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first":
18}), true, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator,
getNode)
assert.Assert(t, alloc3 != nil, "alloc3 expected")
assert.Assert(t, alloc1.IsPreempted(), "alloc1 should have been
preempted")
}
@@ -2259,7 +2259,7 @@ func TestAppDoesNotFitEvent(t *testing.T) {
app.sortedRequests = sr
attempts := 0
- app.tryAllocate(headroom, time.Second, &attempts, func() NodeIterator {
+ app.tryAllocate(headroom, true, time.Second, &attempts, func()
NodeIterator {
return nil
}, func() NodeIterator {
return nil
diff --git a/pkg/scheduler/objects/queue.go b/pkg/scheduler/objects/queue.go
index a6e9dc83..3d2cec67 100644
--- a/pkg/scheduler/objects/queue.go
+++ b/pkg/scheduler/objects/queue.go
@@ -1297,7 +1297,7 @@ func (sq *Queue) canRunApp(appID string) bool {
// resources are skipped.
// Applications are sorted based on the application sortPolicy. Applications
without pending resources are skipped.
// Lock free call this all locks are taken when needed in called functions
-func (sq *Queue) TryAllocate(iterator func() NodeIterator, fullIterator func()
NodeIterator, getnode func(string) *Node) *Allocation {
+func (sq *Queue) TryAllocate(iterator func() NodeIterator, fullIterator func()
NodeIterator, getnode func(string) *Node, allowPreemption bool) *Allocation {
if sq.IsLeafQueue() {
// get the headroom
headRoom := sq.getHeadRoom()
@@ -1309,7 +1309,7 @@ func (sq *Queue) TryAllocate(iterator func()
NodeIterator, fullIterator func() N
if app.IsAccepted() &&
(!sq.canRunApp(app.ApplicationID) ||
!ugm.GetUserManager().CanRunApp(sq.QueuePath, app.ApplicationID, app.user)) {
continue
}
- alloc := app.tryAllocate(headRoom, preemptionDelay,
&preemptAttemptsRemaining, iterator, fullIterator, getnode)
+ alloc := app.tryAllocate(headRoom, allowPreemption,
preemptionDelay, &preemptAttemptsRemaining, iterator, fullIterator, getnode)
if alloc != nil {
log.Log(log.SchedQueue).Debug("allocation found
on queue",
zap.String("queueName", sq.QueuePath),
@@ -1326,7 +1326,7 @@ func (sq *Queue) TryAllocate(iterator func()
NodeIterator, fullIterator func() N
} else {
// process the child queues (filters out queues without pending
requests)
for _, child := range sq.sortQueues() {
- alloc := child.TryAllocate(iterator, fullIterator,
getnode)
+ alloc := child.TryAllocate(iterator, fullIterator,
getnode, allowPreemption)
if alloc != nil {
return alloc
}
diff --git a/pkg/scheduler/partition.go b/pkg/scheduler/partition.go
index 81d09fc5..caf2296b 100644
--- a/pkg/scheduler/partition.go
+++ b/pkg/scheduler/partition.go
@@ -64,6 +64,7 @@ type PartitionContext struct {
allocations int // Number of
allocations on the partition
reservations int // number of
reservations
placeholderAllocations int // number of
placeholder allocations
+ preemptionEnabled bool // whether
preemption is enabled or not
// The partition write lock must not be held while manipulating an
application.
// Scheduling is running continuously as a lock free background task.
Scheduling an application
@@ -131,6 +132,7 @@ func (pc *PartitionContext) initialPartitionFromConfig(conf
configs.PartitionCon
// TODO get the resolver from the config
pc.userGroupCache = security.GetUserGroupCache("")
pc.updateNodeSortingPolicy(conf)
+ pc.updatePreemption(conf)
// update limit settings: start at the root
return ugm.GetUserManager().UpdateConfig(queueConf, conf.Queues[0].Name)
@@ -151,6 +153,11 @@ func (pc *PartitionContext) updateNodeSortingPolicy(conf
configs.PartitionConfig
pc.nodes.SetNodeSortingPolicy(objects.NewNodeSortingPolicy(conf.NodeSortPolicy.Type,
conf.NodeSortPolicy.ResourceWeights))
}
+// NOTE: this is a lock free call. It should only be called holding the
PartitionContext lock.
+func (pc *PartitionContext) updatePreemption(conf configs.PartitionConfig) {
+ pc.preemptionEnabled = conf.Preemption.Enabled == nil ||
*conf.Preemption.Enabled
+}
+
func (pc *PartitionContext) updatePartitionDetails(conf
configs.PartitionConfig) error {
pc.Lock()
defer pc.Unlock()
@@ -165,6 +172,7 @@ func (pc *PartitionContext) updatePartitionDetails(conf
configs.PartitionConfig)
}
pc.rules = &conf.PlacementRules
pc.updateNodeSortingPolicy(conf)
+ pc.updatePreemption(conf)
// start at the root: there is only one queue
queueConf := conf.Queues[0]
root := pc.root
@@ -796,7 +804,7 @@ func (pc *PartitionContext) tryAllocate()
*objects.Allocation {
return nil
}
// try allocating from the root down
- alloc := pc.root.TryAllocate(pc.GetNodeIterator,
pc.GetFullNodeIterator, pc.GetNode)
+ alloc := pc.root.TryAllocate(pc.GetNodeIterator,
pc.GetFullNodeIterator, pc.GetNode, pc.isPreemptionEnabled())
if alloc != nil {
return pc.allocate(alloc)
}
@@ -1433,6 +1441,12 @@ func (pc *PartitionContext)
GetNodeSortingResourceWeights() map[string]float64 {
return policy.ResourceWeights()
}
+func (pc *PartitionContext) isPreemptionEnabled() bool {
+ pc.RLock()
+ defer pc.RUnlock()
+ return pc.preemptionEnabled
+}
+
func (pc *PartitionContext) moveTerminatedApp(appID string) {
app := pc.getApplication(appID)
// nothing to do if the app is not found on the partition
diff --git a/pkg/scheduler/partition_test.go b/pkg/scheduler/partition_test.go
index aa867f0f..6040a5c1 100644
--- a/pkg/scheduler/partition_test.go
+++ b/pkg/scheduler/partition_test.go
@@ -3307,6 +3307,30 @@ func TestRemoveAllocationAsk(t *testing.T) {
assertLimits(t, getTestUserGroup(), nil)
}
+func TestUpdatePreemption(t *testing.T) {
+ var True = true
+ var False = false
+
+ partition, err := newBasePartition()
+ assert.NilError(t, err, "Partition creation failed")
+ assert.Assert(t, partition.isPreemptionEnabled(), "preeemption should
be enabled by default")
+
+ partition.updatePreemption(configs.PartitionConfig{})
+ assert.Assert(t, partition.isPreemptionEnabled(), "preeemption should
be enabled by empty config")
+
+ partition.updatePreemption(configs.PartitionConfig{Preemption:
configs.PartitionPreemptionConfig{}})
+ assert.Assert(t, partition.isPreemptionEnabled(), "preeemption should
be enabled by empty preemption section")
+
+ partition.updatePreemption(configs.PartitionConfig{Preemption:
configs.PartitionPreemptionConfig{Enabled: nil}})
+ assert.Assert(t, partition.isPreemptionEnabled(), "preeemption should
be enabled by explicit nil")
+
+ partition.updatePreemption(configs.PartitionConfig{Preemption:
configs.PartitionPreemptionConfig{Enabled: &True}})
+ assert.Assert(t, partition.isPreemptionEnabled(), "preeemption should
be enabled by explicit true")
+
+ partition.updatePreemption(configs.PartitionConfig{Preemption:
configs.PartitionPreemptionConfig{Enabled: &False}})
+ assert.Assert(t, !partition.isPreemptionEnabled(), "preeemption should
be disabled by explicit false")
+}
+
func TestUpdateNodeSortingPolicy(t *testing.T) {
partition, err := newBasePartition()
if err != nil {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]