This is an automated email from the ASF dual-hosted git repository.

mani pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git


The following commit(s) were added to refs/heads/master by this push:
     new ca733441 [YUNIKORN-2107] Allow preemption to be disabled globally 
(#690)
ca733441 is described below

commit ca733441da3b90c6c69388c83b2239dc8b20a5f6
Author: Craig Condit <[email protected]>
AuthorDate: Mon Nov 6 17:14:14 2023 +0530

    [YUNIKORN-2107] Allow preemption to be disabled globally (#690)
    
    Closes: #690
    
    Signed-off-by: Manikandan R <[email protected]>
---
 pkg/common/configs/config.go              |  6 +++---
 pkg/common/configs/config_test.go         | 33 +++++++++++++++++++++++++------
 pkg/scheduler/objects/application.go      |  6 +++---
 pkg/scheduler/objects/application_test.go | 26 ++++++++++++------------
 pkg/scheduler/objects/queue.go            |  6 +++---
 pkg/scheduler/partition.go                | 16 ++++++++++++++-
 pkg/scheduler/partition_test.go           | 24 ++++++++++++++++++++++
 7 files changed, 88 insertions(+), 29 deletions(-)

diff --git a/pkg/common/configs/config.go b/pkg/common/configs/config.go
index 8f34a044..38930650 100644
--- a/pkg/common/configs/config.go
+++ b/pkg/common/configs/config.go
@@ -50,14 +50,14 @@ type PartitionConfig struct {
        Queues            []QueueConfig
        PlacementRules    []PlacementRule           `yaml:",omitempty" 
json:",omitempty"`
        Limits            []Limit                   `yaml:",omitempty" 
json:",omitempty"`
-       Preemption        PartitionPreemptionConfig `yaml:",omitempty" 
json:",omitempty"` // deprecated
+       Preemption        PartitionPreemptionConfig `yaml:",omitempty" 
json:",omitempty"`
        NodeSortPolicy    NodeSortingPolicy         `yaml:",omitempty" 
json:",omitempty"`
        StateDumpFilePath string                    `yaml:",omitempty" 
json:",omitempty"`
 }
 
-// deprecated
+// The partition preemption configuration
 type PartitionPreemptionConfig struct {
-       Enabled bool
+       Enabled *bool `yaml:",omitempty" json:",omitempty"`
 }
 
 // The queue object for each queue:
diff --git a/pkg/common/configs/config_test.go 
b/pkg/common/configs/config_test.go
index 3be785e4..df1fb08c 100644
--- a/pkg/common/configs/config_test.go
+++ b/pkg/common/configs/config_test.go
@@ -610,6 +610,15 @@ partitions:
 
 func TestPartitionPreemptionParameter(t *testing.T) {
        data := `
+partitions:
+  - name: default
+    queues:
+      - name: root
+  - name: "partition-0"
+    queues:
+      - name: root
+`
+       dataEnabled := `
 partitions:
   - name: default
     queues:
@@ -619,18 +628,30 @@ partitions:
   - name: "partition-0"
     queues:
       - name: root
+`
+       dataDisabled := `
+partitions:
+  - name: default
+    queues:
+      - name: root
+    preemption:
+      enabled: false
+  - name: "partition-0"
+    queues:
+      - name: root
 `
        // validate the config and check after the update
        conf, err := CreateConfig(data)
        assert.NilError(t, err, "should expect no error")
+       assert.Assert(t, conf.Partitions[0].Preemption.Enabled == nil, "default 
should be nil")
 
-       if !conf.Partitions[0].Preemption.Enabled {
-               t.Error("default partition's preemption should be enabled.")
-       }
+       conf, err = CreateConfig(dataEnabled)
+       assert.NilError(t, err, "should expect no error")
+       assert.Assert(t, *conf.Partitions[0].Preemption.Enabled, "preemption 
should be enabled")
 
-       if conf.Partitions[1].Preemption.Enabled {
-               t.Error("partition-0's preemption should NOT be enabled by 
default")
-       }
+       conf, err = CreateConfig(dataDisabled)
+       assert.NilError(t, err, "should expect no error")
+       assert.Assert(t, !*conf.Partitions[0].Preemption.Enabled, "preemption 
should be disabled")
 }
 
 func TestParseRule(t *testing.T) {
diff --git a/pkg/scheduler/objects/application.go 
b/pkg/scheduler/objects/application.go
index d301790b..90b909d8 100644
--- a/pkg/scheduler/objects/application.go
+++ b/pkg/scheduler/objects/application.go
@@ -944,7 +944,7 @@ func (sa *Application) canReplace(request *AllocationAsk) 
bool {
 }
 
 // tryAllocate will perform a regular allocation of a pending request, 
includes placeholders.
-func (sa *Application) tryAllocate(headRoom *resources.Resource, 
preemptionDelay time.Duration, preemptAttemptsRemaining *int, nodeIterator 
func() NodeIterator, fullNodeIterator func() NodeIterator, getNodeFn 
func(string) *Node) *Allocation {
+func (sa *Application) tryAllocate(headRoom *resources.Resource, 
allowPreemption bool, preemptionDelay time.Duration, preemptAttemptsRemaining 
*int, nodeIterator func() NodeIterator, fullNodeIterator func() NodeIterator, 
getNodeFn func(string) *Node) *Allocation {
        sa.Lock()
        defer sa.Unlock()
        if sa.sortedRequests == nil {
@@ -971,7 +971,7 @@ func (sa *Application) tryAllocate(headRoom 
*resources.Resource, preemptionDelay
                // resource must fit in headroom otherwise skip the request 
(unless preemption could help)
                if !headRoom.FitInMaxUndef(request.GetAllocatedResource()) {
                        // attempt preemption
-                       if *preemptAttemptsRemaining > 0 {
+                       if allowPreemption && *preemptAttemptsRemaining > 0 {
                                *preemptAttemptsRemaining--
                                fullIterator := fullNodeIterator()
                                if fullIterator != nil {
@@ -1034,7 +1034,7 @@ func (sa *Application) tryAllocate(headRoom 
*resources.Resource, preemptionDelay
                        }
 
                        // no nodes qualify, attempt preemption
-                       if *preemptAttemptsRemaining > 0 {
+                       if allowPreemption && *preemptAttemptsRemaining > 0 {
                                *preemptAttemptsRemaining--
                                fullIterator := fullNodeIterator()
                                if fullIterator != nil {
diff --git a/pkg/scheduler/objects/application_test.go 
b/pkg/scheduler/objects/application_test.go
index 0edb57ec..8176f7ad 100644
--- a/pkg/scheduler/objects/application_test.go
+++ b/pkg/scheduler/objects/application_test.go
@@ -1778,7 +1778,7 @@ func TestTryAllocateNoRequests(t *testing.T) {
 
        app := newApplication(appID1, "default", "root.unknown")
        preemptionAttemptsRemaining := 0
-       alloc := app.tryAllocate(node.GetAvailableResource(), 30*time.Second, 
&preemptionAttemptsRemaining, iterator, iterator, getNode)
+       alloc := app.tryAllocate(node.GetAvailableResource(), true, 
30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, getNode)
        assert.Check(t, alloc == nil, "unexpected alloc")
 }
 
@@ -1803,7 +1803,7 @@ func TestTryAllocateFit(t *testing.T) {
        assert.NilError(t, err)
 
        preemptionAttemptsRemaining := 0
-       alloc := app.tryAllocate(node.GetAvailableResource(), 30*time.Second, 
&preemptionAttemptsRemaining, iterator, iterator, getNode)
+       alloc := app.tryAllocate(node.GetAvailableResource(), true, 
30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, getNode)
        assert.Assert(t, alloc != nil, "alloc expected")
        assert.Equal(t, "node1", alloc.GetNodeID(), "wrong node")
 }
@@ -1845,19 +1845,19 @@ func TestTryAllocatePreemptQueue(t *testing.T) {
 
        preemptionAttemptsRemaining := 10
 
-       alloc1 := 
app1.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first":
 10}), 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, 
getNode)
+       alloc1 := 
app1.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first":
 10}), true, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, 
getNode)
        assert.Assert(t, alloc1 != nil, "alloc1 expected")
-       alloc2 := 
app1.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first":
 5}), 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, getNode)
+       alloc2 := 
app1.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first":
 5}), true, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, 
getNode)
        assert.Assert(t, alloc2 != nil, "alloc2 expected")
 
        // on first attempt, not enough time has passed
-       alloc3 := 
app2.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first":
 0}), 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, getNode)
+       alloc3 := 
app2.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first":
 0}), true, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, 
getNode)
        assert.Assert(t, alloc3 == nil, "alloc3 not expected")
        assert.Assert(t, !alloc2.IsPreempted(), "alloc2 should not have been 
preempted")
 
        // pass the time and try again
        ask3.createTime = ask3.createTime.Add(-30 * time.Second)
-       alloc3 = 
app2.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first":
 0}), 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, getNode)
+       alloc3 = 
app2.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first":
 0}), true, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, 
getNode)
        assert.Assert(t, alloc3 == nil, "alloc3 not expected")
        assert.Assert(t, alloc2.IsPreempted(), "alloc2 should have been 
preempted")
 }
@@ -1913,20 +1913,20 @@ func TestTryAllocatePreemptNode(t *testing.T) {
        preemptionAttemptsRemaining := 10
 
        // consume capacity with 'unlimited' app
-       alloc00 := 
app0.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first":
 40}), 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, 
getNode)
+       alloc00 := 
app0.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first":
 40}), true, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, 
getNode)
        assert.Assert(t, alloc00 != nil, "alloc00 expected")
-       alloc01 := 
app0.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first":
 39}), 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, 
getNode)
+       alloc01 := 
app0.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first":
 39}), true, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, 
getNode)
        assert.Assert(t, alloc01 != nil, "alloc01 expected")
 
        // consume remainder of space but not quota
-       alloc1 := 
app1.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first":
 28}), 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, 
getNode)
+       alloc1 := 
app1.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first":
 28}), true, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, 
getNode)
        assert.Assert(t, alloc1 != nil, "alloc1 expected")
-       alloc2 := 
app1.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first":
 23}), 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, 
getNode)
+       alloc2 := 
app1.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first":
 23}), true, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, 
getNode)
        assert.Assert(t, alloc2 != nil, "alloc2 expected")
 
        // on first attempt, should see a reservation since we're after the 
reservation timeout
        ask3.createTime = ask3.createTime.Add(-10 * time.Second)
-       alloc3 := 
app2.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first":
 18}), 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, 
getNode)
+       alloc3 := 
app2.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first":
 18}), true, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, 
getNode)
        assert.Assert(t, alloc3 != nil, "alloc3 expected")
        assert.Equal(t, "node1", alloc3.GetNodeID(), "wrong node assignment")
        assert.Equal(t, Reserved, alloc3.GetResult(), "expected reservation")
@@ -1936,7 +1936,7 @@ func TestTryAllocatePreemptNode(t *testing.T) {
 
        // pass the time and try again
        ask3.createTime = ask3.createTime.Add(-30 * time.Second)
-       alloc3 = 
app2.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first":
 18}), 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, 
getNode)
+       alloc3 = 
app2.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first":
 18}), true, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, 
getNode)
        assert.Assert(t, alloc3 != nil, "alloc3 expected")
        assert.Assert(t, alloc1.IsPreempted(), "alloc1 should have been 
preempted")
 }
@@ -2259,7 +2259,7 @@ func TestAppDoesNotFitEvent(t *testing.T) {
        app.sortedRequests = sr
        attempts := 0
 
-       app.tryAllocate(headroom, time.Second, &attempts, func() NodeIterator {
+       app.tryAllocate(headroom, true, time.Second, &attempts, func() 
NodeIterator {
                return nil
        }, func() NodeIterator {
                return nil
diff --git a/pkg/scheduler/objects/queue.go b/pkg/scheduler/objects/queue.go
index a6e9dc83..3d2cec67 100644
--- a/pkg/scheduler/objects/queue.go
+++ b/pkg/scheduler/objects/queue.go
@@ -1297,7 +1297,7 @@ func (sq *Queue) canRunApp(appID string) bool {
 // resources are skipped.
 // Applications are sorted based on the application sortPolicy. Applications 
without pending resources are skipped.
 // Lock free call this all locks are taken when needed in called functions
-func (sq *Queue) TryAllocate(iterator func() NodeIterator, fullIterator func() 
NodeIterator, getnode func(string) *Node) *Allocation {
+func (sq *Queue) TryAllocate(iterator func() NodeIterator, fullIterator func() 
NodeIterator, getnode func(string) *Node, allowPreemption bool) *Allocation {
        if sq.IsLeafQueue() {
                // get the headroom
                headRoom := sq.getHeadRoom()
@@ -1309,7 +1309,7 @@ func (sq *Queue) TryAllocate(iterator func() 
NodeIterator, fullIterator func() N
                        if app.IsAccepted() && 
(!sq.canRunApp(app.ApplicationID) || 
!ugm.GetUserManager().CanRunApp(sq.QueuePath, app.ApplicationID, app.user)) {
                                continue
                        }
-                       alloc := app.tryAllocate(headRoom, preemptionDelay, 
&preemptAttemptsRemaining, iterator, fullIterator, getnode)
+                       alloc := app.tryAllocate(headRoom, allowPreemption, 
preemptionDelay, &preemptAttemptsRemaining, iterator, fullIterator, getnode)
                        if alloc != nil {
                                log.Log(log.SchedQueue).Debug("allocation found 
on queue",
                                        zap.String("queueName", sq.QueuePath),
@@ -1326,7 +1326,7 @@ func (sq *Queue) TryAllocate(iterator func() 
NodeIterator, fullIterator func() N
        } else {
                // process the child queues (filters out queues without pending 
requests)
                for _, child := range sq.sortQueues() {
-                       alloc := child.TryAllocate(iterator, fullIterator, 
getnode)
+                       alloc := child.TryAllocate(iterator, fullIterator, 
getnode, allowPreemption)
                        if alloc != nil {
                                return alloc
                        }
diff --git a/pkg/scheduler/partition.go b/pkg/scheduler/partition.go
index 81d09fc5..caf2296b 100644
--- a/pkg/scheduler/partition.go
+++ b/pkg/scheduler/partition.go
@@ -64,6 +64,7 @@ type PartitionContext struct {
        allocations            int                             // Number of 
allocations on the partition
        reservations           int                             // number of 
reservations
        placeholderAllocations int                             // number of 
placeholder allocations
+       preemptionEnabled      bool                            // whether 
preemption is enabled or not
 
        // The partition write lock must not be held while manipulating an 
application.
        // Scheduling is running continuously as a lock free background task. 
Scheduling an application
@@ -131,6 +132,7 @@ func (pc *PartitionContext) initialPartitionFromConfig(conf 
configs.PartitionCon
        // TODO get the resolver from the config
        pc.userGroupCache = security.GetUserGroupCache("")
        pc.updateNodeSortingPolicy(conf)
+       pc.updatePreemption(conf)
 
        // update limit settings: start at the root
        return ugm.GetUserManager().UpdateConfig(queueConf, conf.Queues[0].Name)
@@ -151,6 +153,11 @@ func (pc *PartitionContext) updateNodeSortingPolicy(conf 
configs.PartitionConfig
        
pc.nodes.SetNodeSortingPolicy(objects.NewNodeSortingPolicy(conf.NodeSortPolicy.Type,
 conf.NodeSortPolicy.ResourceWeights))
 }
 
+// NOTE: this is a lock free call. It should only be called holding the 
PartitionContext lock.
+func (pc *PartitionContext) updatePreemption(conf configs.PartitionConfig) {
+       pc.preemptionEnabled = conf.Preemption.Enabled == nil || 
*conf.Preemption.Enabled
+}
+
 func (pc *PartitionContext) updatePartitionDetails(conf 
configs.PartitionConfig) error {
        pc.Lock()
        defer pc.Unlock()
@@ -165,6 +172,7 @@ func (pc *PartitionContext) updatePartitionDetails(conf 
configs.PartitionConfig)
        }
        pc.rules = &conf.PlacementRules
        pc.updateNodeSortingPolicy(conf)
+       pc.updatePreemption(conf)
        // start at the root: there is only one queue
        queueConf := conf.Queues[0]
        root := pc.root
@@ -796,7 +804,7 @@ func (pc *PartitionContext) tryAllocate() 
*objects.Allocation {
                return nil
        }
        // try allocating from the root down
-       alloc := pc.root.TryAllocate(pc.GetNodeIterator, 
pc.GetFullNodeIterator, pc.GetNode)
+       alloc := pc.root.TryAllocate(pc.GetNodeIterator, 
pc.GetFullNodeIterator, pc.GetNode, pc.isPreemptionEnabled())
        if alloc != nil {
                return pc.allocate(alloc)
        }
@@ -1433,6 +1441,12 @@ func (pc *PartitionContext) 
GetNodeSortingResourceWeights() map[string]float64 {
        return policy.ResourceWeights()
 }
 
+func (pc *PartitionContext) isPreemptionEnabled() bool {
+       pc.RLock()
+       defer pc.RUnlock()
+       return pc.preemptionEnabled
+}
+
 func (pc *PartitionContext) moveTerminatedApp(appID string) {
        app := pc.getApplication(appID)
        // nothing to do if the app is not found on the partition
diff --git a/pkg/scheduler/partition_test.go b/pkg/scheduler/partition_test.go
index aa867f0f..6040a5c1 100644
--- a/pkg/scheduler/partition_test.go
+++ b/pkg/scheduler/partition_test.go
@@ -3307,6 +3307,30 @@ func TestRemoveAllocationAsk(t *testing.T) {
        assertLimits(t, getTestUserGroup(), nil)
 }
 
+func TestUpdatePreemption(t *testing.T) {
+       var True = true
+       var False = false
+
+       partition, err := newBasePartition()
+       assert.NilError(t, err, "Partition creation failed")
+       assert.Assert(t, partition.isPreemptionEnabled(), "preeemption should 
be enabled by default")
+
+       partition.updatePreemption(configs.PartitionConfig{})
+       assert.Assert(t, partition.isPreemptionEnabled(), "preeemption should 
be enabled by empty config")
+
+       partition.updatePreemption(configs.PartitionConfig{Preemption: 
configs.PartitionPreemptionConfig{}})
+       assert.Assert(t, partition.isPreemptionEnabled(), "preeemption should 
be enabled by empty preemption section")
+
+       partition.updatePreemption(configs.PartitionConfig{Preemption: 
configs.PartitionPreemptionConfig{Enabled: nil}})
+       assert.Assert(t, partition.isPreemptionEnabled(), "preeemption should 
be enabled by explicit nil")
+
+       partition.updatePreemption(configs.PartitionConfig{Preemption: 
configs.PartitionPreemptionConfig{Enabled: &True}})
+       assert.Assert(t, partition.isPreemptionEnabled(), "preeemption should 
be enabled by explicit true")
+
+       partition.updatePreemption(configs.PartitionConfig{Preemption: 
configs.PartitionPreemptionConfig{Enabled: &False}})
+       assert.Assert(t, !partition.isPreemptionEnabled(), "preeemption should 
be disabled by explicit false")
+}
+
 func TestUpdateNodeSortingPolicy(t *testing.T) {
        partition, err := newBasePartition()
        if err != nil {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to