This is an automated email from the ASF dual-hosted git repository.
pbacsko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git
The following commit(s) were added to refs/heads/master by this push:
new e41a426c [YUNIKORN-3115] Skip scheduling application if too many pods
are unschedulable (#1046)
e41a426c is described below
commit e41a426c827046a7f7590004076332ca8eb849e1
Author: Peter Bacsko <[email protected]>
AuthorDate: Tue Nov 18 13:35:13 2025 +0100
[YUNIKORN-3115] Skip scheduling application if too many pods are
unschedulable (#1046)
Closes: #1046
Signed-off-by: Peter Bacsko <[email protected]>
---
pkg/common/configs/configvalidator.go | 15 +++++----
pkg/scheduler/objects/application.go | 25 ++++++++++++++
pkg/scheduler/objects/application_test.go | 45 +++++++++++++++++++++++++
pkg/scheduler/objects/queue.go | 52 ++++++++++++++++++++++++++++
pkg/scheduler/objects/queue_test.go | 37 ++++++++++++++++++++
pkg/scheduler/partition_test.go | 56 +++++++++++++++++++++++++++++++
pkg/webservice/dao/application_info.go | 3 ++
pkg/webservice/handlers.go | 7 ++++
pkg/webservice/handlers_test.go | 8 +++++
9 files changed, 242 insertions(+), 6 deletions(-)
diff --git a/pkg/common/configs/configvalidator.go
b/pkg/common/configs/configvalidator.go
index 06a6f49d..109d295f 100644
--- a/pkg/common/configs/configvalidator.go
+++ b/pkg/common/configs/configvalidator.go
@@ -41,12 +41,14 @@ const (
DotReplace = "_dot_"
DefaultPartition = "default"
- ApplicationSortPolicy = "application.sort.policy"
- ApplicationSortPriority = "application.sort.priority"
- PriorityPolicy = "priority.policy"
- PriorityOffset = "priority.offset"
- PreemptionPolicy = "preemption.policy"
- PreemptionDelay = "preemption.delay"
+ ApplicationSortPolicy = "application.sort.policy"
+ ApplicationSortPriority = "application.sort.priority"
+ ApplicationUnschedulableAsksBackoff =
"application.unschedasks.backoff"
+ ApplicationUnschedulableAsksBackoffDelay =
"application.unschedasks.backoff.delay"
+ PriorityPolicy = "priority.policy"
+ PriorityOffset = "priority.offset"
+ PreemptionPolicy = "preemption.policy"
+ PreemptionDelay = "preemption.delay"
// app sort priority values
ApplicationSortPriorityEnabled = "enabled"
@@ -66,6 +68,7 @@ var MinPriority int32 = math.MinInt32
var MaxPriority int32 = math.MaxInt32
var DefaultPreemptionDelay = 30 * time.Second
+var DefaultAskBackOffDelay = 30 * time.Second
// A queue can be a username with the dot replaced. Most systems allow a 32
character user name.
// The queue name must thus allow for at least that length with the
replacement of dots.
diff --git a/pkg/scheduler/objects/application.go
b/pkg/scheduler/objects/application.go
index 54244754..cfc096e5 100644
--- a/pkg/scheduler/objects/application.go
+++ b/pkg/scheduler/objects/application.go
@@ -118,6 +118,7 @@ type Application struct {
hasPlaceholderAlloc bool // Whether there is at
least one allocated placeholder
runnableInQueue bool // whether the
application is runnable/schedulable in the queue. Default is true.
runnableByUserLimit bool // whether the
application is runnable/schedulable based on user/group quota. Default is true.
+ backoffDeadline time.Time // no scheduling from
this application until this deadline
rmEventHandler handler.EventHandler
rmID string
@@ -990,8 +991,18 @@ func (sa *Application) tryAllocate(headRoom
*resources.Resource, allowPreemption
}
// calculate the users' headroom, includes group check which requires
the applicationID
userHeadroom := ugm.GetUserManager().Headroom(sa.queuePath,
sa.ApplicationID, sa.user)
+ unschedulable := uint64(0)
// get all the requests from the app sorted in order
for _, request := range sa.sortedRequests {
+ backoffThreshold := sa.queue.GetMaxAppUnschedAskBackoff()
+ if backoffThreshold > 0 && unschedulable >= backoffThreshold {
+ log.Log(log.SchedApplication).Info("too many
unschedulable asks in the application, waiting",
+ zap.String("application ID", sa.ApplicationID),
+ zap.Uint64("number of unschedulable asks",
unschedulable))
+ delay := sa.queue.GetBackoffDelay()
+ sa.backoffDeadline = time.Now().Add(delay)
+ return nil
+ }
if request.IsAllocated() {
continue
}
@@ -1060,6 +1071,7 @@ func (sa *Application) tryAllocate(headRoom
*resources.Resource, allowPreemption
request.LogAllocationFailure(common.PreemptionDoesNotHelp, true)
}
}
+ unschedulable++
}
// no requests fit, skip to next app
return nil
@@ -2305,3 +2317,16 @@ func (sa *Application) GetSubmissionTime() time.Time {
defer sa.RUnlock()
return sa.submissionTime
}
+
+func (sa *Application) GetBackoffDeadline() time.Time {
+ sa.RLock()
+ defer sa.RUnlock()
+ return sa.backoffDeadline
+}
+
+// SetBackoffDeadline sets the backoff deadline. Only used for testing.
+func (sa *Application) SetBackoffDeadline(deadline time.Time) {
+ sa.Lock()
+ defer sa.Unlock()
+ sa.backoffDeadline = deadline
+}
diff --git a/pkg/scheduler/objects/application_test.go
b/pkg/scheduler/objects/application_test.go
index 53688323..e00c67e9 100644
--- a/pkg/scheduler/objects/application_test.go
+++ b/pkg/scheduler/objects/application_test.go
@@ -72,6 +72,7 @@ func TestNewApplication(t *testing.T) {
assert.Equal(t, app.Partition, "", "partition name should not be set
was not set in SI")
assert.Equal(t, app.rmID, "", "RM ID should not be set was not passed
in")
assert.Equal(t, app.rmEventHandler, handler.EventHandler(nil), "event
handler should be nil")
+ assert.Equal(t, app.backoffDeadline, time.Time{})
// just check one of the resources...
assert.Assert(t, resources.IsZero(app.placeholderAsk), "placeholder ask
should be zero")
assert.Assert(t, app.IsNew(), "new application must be in new state")
@@ -3833,3 +3834,47 @@ func TestAppSubmissionTime(t *testing.T) {
app.AddAllocation(alloc2)
assert.Equal(t, app.submissionTime, time.Unix(0, 30), "app submission
time is not set properly")
}
+
+func TestApplicationBackoff(t *testing.T) {
+ rootQ, err := createRootQueue(map[string]string{"first": "10"})
+ assert.NilError(t, err, "unexpected error when creating root queue")
+ props := map[string]string{
+ configs.ApplicationUnschedulableAsksBackoff: "2",
+ }
+ leaf, err := createManagedQueueWithProps(rootQ, "leaf", false,
map[string]string{"first": "5"}, props)
+ assert.NilError(t, err, "unexpected error when creating leaf queue")
+
+ // create app
+ app := newApplication(appID0, "default", "root.leaf")
+ app.SetQueue(leaf)
+
+ // create asks
+ askRes :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 6})
+ ask1 := newAllocationAsk(aKey, appID0, askRes)
+ err = app.AddAllocationAsk(ask1)
+ assert.NilError(t, err)
+ ask2 := newAllocationAsk(aKey2, appID0, askRes)
+ err = app.AddAllocationAsk(ask2)
+ assert.NilError(t, err)
+ ask3 := newAllocationAsk(aKey3, appID0, askRes)
+ err = app.AddAllocationAsk(ask3)
+ assert.NilError(t, err)
+
+ // create nodes (enough cluster capacity, but asks cannot be placed)
+ node1 := newNode("node1", map[string]resources.Quantity{"first": 5})
+ node2 := newNode("node1", map[string]resources.Quantity{"first": 5})
+ nodeMap := map[string]*Node{
+ "node1": node1,
+ "node2": node2,
+ }
+ iterator := getNodeIteratorFn(node1, node2)
+ getNode := func(nodeID string) *Node {
+ return nodeMap[nodeID]
+ }
+ preemptionAttemptsRemaining := 0
+ beforeTryAlloc := time.Now()
+ available :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10})
+ result := app.tryAllocate(available, true, 30*time.Second,
&preemptionAttemptsRemaining, iterator, iterator, getNode)
+ assert.Assert(t, result == nil)
+ assert.Assert(t, app.GetBackoffDeadline().After(beforeTryAlloc))
+}
diff --git a/pkg/scheduler/objects/queue.go b/pkg/scheduler/objects/queue.go
index e6b6813e..a56fd460 100644
--- a/pkg/scheduler/objects/queue.go
+++ b/pkg/scheduler/objects/queue.go
@@ -92,6 +92,8 @@ type Queue struct {
quotaChangePreemptionDelay uint64
quotaChangePreemptionStartTime time.Time
isQuotaChangePreemptionRunning bool
+ unschedAskBackoff uint64
+ askBackoffDelay time.Duration
locking.RWMutex
}
@@ -116,6 +118,7 @@ func newBlankQueue() *Queue {
preemptionPolicy:
policies.DefaultPreemptionPolicy,
quotaChangePreemptionDelay: 0,
quotaChangePreemptionStartTime: time.Time{},
+ askBackoffDelay: configs.DefaultAskBackOffDelay,
}
}
@@ -317,6 +320,25 @@ func filterParentProperty(key string, value string) string
{
return value
}
+func unschedulableAskBackoff(value string) (uint64, error) {
+ intValue, err := strconv.ParseUint(value, 10, 64)
+ if err != nil {
+ return 0, err
+ }
+ return intValue, nil
+}
+
+func backoffDelay(value string) (time.Duration, error) {
+ result, err := time.ParseDuration(value)
+ if err != nil {
+ return configs.DefaultAskBackOffDelay, err
+ }
+ if int64(result) <= int64(0) {
+ return configs.DefaultAskBackOffDelay, fmt.Errorf("%s must be
positive: %s", configs.ApplicationUnschedulableAsksBackoffDelay, value)
+ }
+ return result, nil
+}
+
// ApplyConf is the locked version of applyConf
func (sq *Queue) ApplyConf(conf configs.QueueConfig) error {
sq.Lock()
@@ -616,6 +638,20 @@ func (sq *Queue) UpdateQueueProperties() {
zap.Error(err))
}
}
+ case configs.ApplicationUnschedulableAsksBackoff:
+ unschedAskBackoff, err := unschedulableAskBackoff(value)
+ if err != nil {
+ log.Log(log.SchedQueue).Debug("unschedulable
ask backoff configuration error",
+ zap.Error(err))
+ }
+ sq.unschedAskBackoff = unschedAskBackoff
+ case configs.ApplicationUnschedulableAsksBackoffDelay:
+ askBackoffDelay, err := backoffDelay(value)
+ if err != nil {
+ log.Log(log.SchedQueue).Debug("unschedulable
ask backoff delay configuration error",
+ zap.Error(err))
+ }
+ sq.askBackoffDelay = askBackoffDelay
default:
// skip unknown properties just log them
log.Log(log.SchedQueue).Debug("queue property skipped",
@@ -1519,6 +1555,10 @@ func (sq *Queue) TryAllocate(iterator func()
NodeIterator, fullIterator func() N
if app.IsAccepted() && (!runnableInQueue ||
!runnableByUserLimit) {
continue
}
+ deadline := app.GetBackoffDeadline()
+ if !deadline.IsZero() && time.Now().Before(deadline) {
+ continue
+ }
result := app.tryAllocate(headRoom, allowPreemption,
preemptionDelay, &preemptAttemptsRemaining, iterator, fullIterator, getnode)
if result != nil {
log.Log(log.SchedQueue).Info("allocation found
on queue",
@@ -2140,3 +2180,15 @@ func (sq *Queue) IsQuotaChangePreemptionRunning() bool {
defer sq.RUnlock()
return sq.isQuotaChangePreemptionRunning
}
+
+func (sq *Queue) GetMaxAppUnschedAskBackoff() uint64 {
+ sq.RLock()
+ defer sq.RUnlock()
+ return sq.unschedAskBackoff
+}
+
+func (sq *Queue) GetBackoffDelay() time.Duration {
+ sq.RLock()
+ defer sq.RUnlock()
+ return sq.askBackoffDelay
+}
diff --git a/pkg/scheduler/objects/queue_test.go
b/pkg/scheduler/objects/queue_test.go
index 64f9fcb4..9c1575d9 100644
--- a/pkg/scheduler/objects/queue_test.go
+++ b/pkg/scheduler/objects/queue_test.go
@@ -3014,3 +3014,40 @@ func TestQueue_allocatedResFits_Other(t *testing.T) {
})
}
}
+
+func TestQueueBackoffProperties(t *testing.T) {
+ root, err := createRootQueue(nil)
+ assert.NilError(t, err, "failed to create basic root queue")
+ parent, err := createManagedQueue(root, "parent", true, nil)
+ assert.NilError(t, err, "failed to create parent queue")
+ assert.Equal(t, uint64(0), parent.GetMaxAppUnschedAskBackoff())
+ assert.Equal(t, 30*time.Second, parent.GetBackoffDelay())
+
+ leaf, err := createManagedQueue(parent, "leaf", false, nil)
+ assert.NilError(t, err, "failed to create leaf queue")
+ assert.Equal(t, uint64(0), leaf.GetMaxAppUnschedAskBackoff())
+ assert.Equal(t, 30*time.Second, leaf.GetBackoffDelay())
+
+ props := map[string]string{
+ configs.ApplicationUnschedulableAsksBackoffDelay: "123s",
+ configs.ApplicationUnschedulableAsksBackoff: "12",
+ }
+ parent2, err := createManagedQueueWithProps(root, "parent2", true, nil,
props)
+ assert.NilError(t, err, "failed to create parent queue")
+ assert.Equal(t, uint64(12), parent2.GetMaxAppUnschedAskBackoff())
+ assert.Equal(t, 123*time.Second, parent2.GetBackoffDelay())
+
+ leaf2, err := createManagedQueue(parent2, "leaf2", false, nil)
+ assert.NilError(t, err, "failed to create leaf2 queue")
+ assert.Equal(t, uint64(12), leaf2.GetMaxAppUnschedAskBackoff())
+ assert.Equal(t, 123*time.Second, leaf2.GetBackoffDelay())
+
+ props = map[string]string{
+ configs.ApplicationUnschedulableAsksBackoffDelay: "xyz",
+ configs.ApplicationUnschedulableAsksBackoff: "-4",
+ }
+ leaf3, err := createManagedQueueWithProps(root, "parent2", false, nil,
props)
+ assert.NilError(t, err, "failed to create leaf3 queue")
+ assert.Equal(t, uint64(0), leaf3.GetMaxAppUnschedAskBackoff())
+ assert.Equal(t, 30*time.Second, leaf3.GetBackoffDelay())
+}
diff --git a/pkg/scheduler/partition_test.go b/pkg/scheduler/partition_test.go
index 9b8d0ba6..1ee9a4b3 100644
--- a/pkg/scheduler/partition_test.go
+++ b/pkg/scheduler/partition_test.go
@@ -4901,3 +4901,59 @@ func TestAppSchedulingOrderFIFO(t *testing.T) {
assert.Assert(t, alloc != nil, "no allocation was made")
assert.Equal(t, "app2-alloc-3", alloc.Request.GetAllocationKey())
}
+
+func TestApplicationBackoff(t *testing.T) {
+ setupUGM()
+ partition, err := newBasePartition()
+ assert.NilError(t, err, "partition create failed")
+ conf := configs.PartitionConfig{
+ Name: "test",
+ Queues: []configs.QueueConfig{
+ {
+ Name: "root",
+ Parent: true,
+ SubmitACL: "*",
+ Queues: []configs.QueueConfig{
+ {
+ Name: "default",
+ Parent: false,
+ Properties: map[string]string{
+
configs.ApplicationUnschedulableAsksBackoff: "2",
+ },
+ },
+ },
+ },
+ },
+ }
+ err = partition.updatePartitionDetails(conf)
+ assert.NilError(t, err, "unable to update partition config")
+
+ nodeRes :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10})
+ node1 := newNodeMaxResource(nodeID1, nodeRes)
+ err = partition.AddNode(node1)
+ assert.NilError(t, err)
+ node2 := newNodeMaxResource(nodeID2, nodeRes)
+ err = partition.AddNode(node2)
+ assert.NilError(t, err)
+
+ app := newApplication(appID1, "default", defQueue)
+ err = partition.AddApplication(app)
+ assert.NilError(t, err, "could not add application")
+
+ askRes :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 15})
+ app1Ask1 := newAllocationAsk("alloc-1", appID1, askRes)
+ err = app.AddAllocationAsk(app1Ask1)
+ assert.NilError(t, err, "could not add ask")
+ app1Ask2 := newAllocationAsk("alloc-2", appID1, askRes)
+ err = app.AddAllocationAsk(app1Ask2)
+ assert.NilError(t, err, "could not add ask")
+ app1Ask3 := newAllocationAsk("alloc-3", appID1, askRes)
+ err = app.AddAllocationAsk(app1Ask3)
+ assert.NilError(t, err, "could not add ask")
+
+ beforeAlloc := time.Now()
+ partition.tryAllocate()
+ deadline := app.GetBackoffDeadline()
+ assert.Assert(t, !deadline.IsZero())
+ assert.Assert(t, deadline.After(beforeAlloc))
+}
diff --git a/pkg/webservice/dao/application_info.go
b/pkg/webservice/dao/application_info.go
index 53b423ea..d8e7113e 100644
--- a/pkg/webservice/dao/application_info.go
+++ b/pkg/webservice/dao/application_info.go
@@ -18,6 +18,8 @@
package dao
+import "time"
+
type ApplicationsDAOInfo struct {
Applications []ApplicationDAOInfo `json:"applications,omitempty"`
}
@@ -44,6 +46,7 @@ type ApplicationDAOInfo struct {
MaxRequestPriority int32
`json:"maxRequestPriority,omitempty"`
StartTime int64 `json:"startTime,omitempty"`
ResourceHistory ResourceHistory
`json:"resourceHistory,omitempty"`
+ BackoffDeadline *time.Time
`json:"backoffDeadline,omitempty"`
}
type StateDAOInfo struct {
diff --git a/pkg/webservice/handlers.go b/pkg/webservice/handlers.go
index c8d5fc30..a76f76c5 100644
--- a/pkg/webservice/handlers.go
+++ b/pkg/webservice/handlers.go
@@ -352,6 +352,12 @@ func getApplicationDAO(app *objects.Application)
*dao.ApplicationDAOInfo {
PlaceholderResource:
app.GetTrackedDAOMap("placeholderResource"),
}
+ backoffDeadline := app.GetBackoffDeadline()
+ var backoffDeadlinePtr *time.Time
+ if !backoffDeadline.IsZero() {
+ backoffDeadlinePtr = &backoffDeadline
+ }
+
return &dao.ApplicationDAOInfo{
ApplicationID: app.ApplicationID,
UsedResource: app.GetAllocatedResource().DAOMap(),
@@ -374,6 +380,7 @@ func getApplicationDAO(app *objects.Application)
*dao.ApplicationDAOInfo {
MaxRequestPriority: app.GetAskMaxPriority(),
StartTime: app.StartTime().UnixMilli(),
ResourceHistory: resHistory,
+ BackoffDeadline: backoffDeadlinePtr,
}
}
diff --git a/pkg/webservice/handlers_test.go b/pkg/webservice/handlers_test.go
index e9969393..e72cf8b7 100644
--- a/pkg/webservice/handlers_test.go
+++ b/pkg/webservice/handlers_test.go
@@ -1865,6 +1865,14 @@ func TestGetApplicationHandler(t *testing.T) {
assert.Equal(t, appsDao.Partition, "default")
assert.Equal(t, appsDao.QueueName, "root.default")
assert.Equal(t, len(appsDao.Allocations), 0)
+ assert.Assert(t, appsDao.BackoffDeadline == nil)
+ deadline := time.Unix(123, 0)
+ app.SetBackoffDeadline(deadline)
+ resp = &MockResponseWriter{}
+ getApplication(resp, req)
+ err = json.Unmarshal(resp.outputBytes, &appsDao)
+ assert.NilError(t, err, unmarshalError)
+ assert.Assert(t, deadline.Equal(*appsDao.BackoffDeadline))
// test nonexistent partition
var req1 *http.Request
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]