This is an automated email from the ASF dual-hosted git repository.
pbacsko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git
The following commit(s) were added to refs/heads/master by this push:
new bf04bd44 [YUNIKORN-2772] Scheduler restart does not preserve app start
time (#1018)
bf04bd44 is described below
commit bf04bd44bebd3af53092a25c7e242349782c25e4
Author: Peter Bacsko <[email protected]>
AuthorDate: Mon Apr 14 13:45:40 2025 +0200
[YUNIKORN-2772] Scheduler restart does not preserve app start time (#1018)
Closes: #1018
Signed-off-by: Peter Bacsko <[email protected]>
---
pkg/scheduler/objects/application.go | 23 ++++++---
pkg/scheduler/objects/application_test.go | 23 +++++++++
pkg/scheduler/objects/sorters.go | 8 ++--
pkg/scheduler/objects/sorters_test.go | 9 ++--
pkg/scheduler/partition_test.go | 78 +++++++++++++++++++++++++++++--
pkg/scheduler/utilities_test.go | 9 ++--
pkg/webservice/handlers.go | 2 +-
7 files changed, 129 insertions(+), 23 deletions(-)
diff --git a/pkg/scheduler/objects/application.go
b/pkg/scheduler/objects/application.go
index adff3817..9eb0e3ed 100644
--- a/pkg/scheduler/objects/application.go
+++ b/pkg/scheduler/objects/application.go
@@ -77,10 +77,9 @@ type StateLogEntry struct {
}
type Application struct {
- ApplicationID string // application ID
- Partition string // partition Name
- SubmissionTime time.Time // time application was submitted
- tags map[string]string // application tags used in scheduling
+ ApplicationID string // application ID
+ Partition string // partition Name
+ tags map[string]string // application tags used in scheduling
// Private mutable fields need protection
queuePath string
@@ -91,6 +90,7 @@ type Application struct {
sortedRequests sortedRequests // list of requests pre-sorted
user security.UserGroup // owner of the application
allocatedResource *resources.Resource // total allocated resources
+ submissionTime time.Time // time application was
submitted (based on the first ask)
usedResource *resources.TrackedResource // keep track of
resource usage of the application
preemptedResource *resources.TrackedResource // keep track of
preempted resource usage of the application
@@ -128,7 +128,7 @@ func NewApplication(siApp *si.AddApplicationRequest, ugi
security.UserGroup, eve
app := &Application{
ApplicationID: siApp.ApplicationID,
Partition: siApp.PartitionName,
- SubmissionTime: time.Now(),
+ submissionTime: time.Now(),
queuePath: siApp.QueueName,
tags: siApp.Tags,
pending: resources.NewResource(),
@@ -175,7 +175,7 @@ func (sa *Application) String() string {
return "application is nil"
}
return fmt.Sprintf("applicationID: %s, Partition: %s, SubmissionTime:
%x, State: %s",
- sa.ApplicationID, sa.Partition, sa.SubmissionTime,
sa.stateMachine.Current())
+ sa.ApplicationID, sa.Partition, sa.GetSubmissionTime(),
sa.stateMachine.Current())
}
func (sa *Application) SetState(state string) {
@@ -597,6 +597,9 @@ func (sa *Application) AddAllocationAsk(ask *Allocation)
error {
if ask.IsAllocated() || resources.IsZero(ask.GetAllocatedResource()) {
return fmt.Errorf("invalid ask added to app %s: %v",
sa.ApplicationID, ask)
}
+ if ask.createTime.Before(sa.submissionTime) {
+ sa.submissionTime = ask.createTime
+ }
delta := ask.GetAllocatedResource().Clone()
var oldAskResource *resources.Resource = nil
@@ -2099,7 +2102,7 @@ func (sa *Application) GetApplicationSummary(rmID string)
*ApplicationSummary {
func (sa *Application) getApplicationSummary(rmID string) *ApplicationSummary {
return &ApplicationSummary{
ApplicationID: sa.ApplicationID,
- SubmissionTime: sa.SubmissionTime,
+ SubmissionTime: sa.submissionTime,
StartTime: sa.startTime,
FinishTime: sa.finishedTime,
User: sa.user.User,
@@ -2262,3 +2265,9 @@ func (sa *Application) getResourceFromTags(tag string)
*resources.Resource {
return resource
}
+
+func (sa *Application) GetSubmissionTime() time.Time {
+ sa.RLock()
+ defer sa.RUnlock()
+ return sa.submissionTime
+}
diff --git a/pkg/scheduler/objects/application_test.go
b/pkg/scheduler/objects/application_test.go
index 62c507ed..2c110330 100644
--- a/pkg/scheduler/objects/application_test.go
+++ b/pkg/scheduler/objects/application_test.go
@@ -3650,3 +3650,26 @@ func TestTryNodesNoReserve(t *testing.T) {
assert.Equal(t, result.ResultType, AllocatedReserved, "result type
should be AllocatedReserved")
assert.Equal(t, result.ReservedNodeID, node1.NodeID, "reserved node
should be node1")
}
+
+func TestAppSubmissionTime(t *testing.T) {
+ app := newApplication(appID0, "default", "root.default")
+ queue, err := createRootQueue(map[string]string{"first": "5"})
+ assert.NilError(t, err, "queue create failed")
+ app.queue = queue
+
+ res :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5})
+ ask1 := newAllocationAsk(aKey, appID1, res)
+ ask1.createTime = time.Unix(0, 100)
+ err = app.AddAllocationAsk(ask1)
+ assert.NilError(t, err)
+ ask2 := newAllocationAsk(aKey2, appID1, res)
+ ask2.createTime = time.Unix(0, 200)
+ err = app.AddAllocationAsk(ask2)
+ assert.NilError(t, err)
+ assert.Equal(t, app.submissionTime, time.Unix(0, 100), "app submission
time is not set properly")
+ ask3 := newAllocationAsk(aKey3, appID1, res)
+ ask3.createTime = time.Unix(0, 50)
+ err = app.AddAllocationAsk(ask3)
+ assert.NilError(t, err)
+ assert.Equal(t, app.submissionTime, time.Unix(0, 50), "app submission
time is not set properly")
+}
diff --git a/pkg/scheduler/objects/sorters.go b/pkg/scheduler/objects/sorters.go
index 24f9dec7..723d99d2 100644
--- a/pkg/scheduler/objects/sorters.go
+++ b/pkg/scheduler/objects/sorters.go
@@ -150,10 +150,12 @@ func
sortApplicationsBySubmissionTimeAndPriority(sortedApps []*Application) {
sort.SliceStable(sortedApps, func(i, j int) bool {
l := sortedApps[i]
r := sortedApps[j]
- if l.SubmissionTime.Before(r.SubmissionTime) {
+ lSubTime := l.GetSubmissionTime()
+ rSubTime := r.GetSubmissionTime()
+ if lSubTime.Before(rSubTime) {
return true
}
- if r.SubmissionTime.Before(l.SubmissionTime) {
+ if rSubTime.Before(lSubTime) {
return false
}
return l.GetAskMaxPriority() > r.GetAskMaxPriority()
@@ -172,7 +174,7 @@ func sortApplicationsByPriorityAndSubmissionTime(sortedApps
[]*Application) {
if leftPriority < rightPriority {
return false
}
- return l.SubmissionTime.Before(r.SubmissionTime)
+ return l.GetSubmissionTime().Before(r.GetSubmissionTime())
})
}
diff --git a/pkg/scheduler/objects/sorters_test.go
b/pkg/scheduler/objects/sorters_test.go
index 834ab7cf..0d7f0c91 100644
--- a/pkg/scheduler/objects/sorters_test.go
+++ b/pkg/scheduler/objects/sorters_test.go
@@ -226,8 +226,7 @@ func TestSortAppsFifo(t *testing.T) {
app := newApplication(appID, "partition", "queue")
app.pending = res
input[appID] = app
- // make sure the time stamps differ at least a bit (tracking in
nano seconds)
- time.Sleep(time.Nanosecond * 5)
+ app.submissionTime = time.Unix(int64(i), 0)
}
// fifo - apps should come back in order created 0, 1, 2, 3
@@ -236,8 +235,8 @@ func TestSortAppsFifo(t *testing.T) {
input["app-1"].askMaxPriority = 3
input["app-3"].askMaxPriority = 5
- input["app-2"].SubmissionTime = input["app-3"].SubmissionTime
- input["app-1"].SubmissionTime = input["app-3"].SubmissionTime
+ input["app-2"].submissionTime = input["app-3"].submissionTime
+ input["app-1"].submissionTime = input["app-3"].submissionTime
list = sortApplications(input, policies.FifoSortPolicy, false, nil)
/*
* apps order: 0, 3, 1, 2
@@ -410,7 +409,7 @@ func TestSortBySubmissionTime(t *testing.T) {
app := newApplication(appID, "partition", "queue")
app.pending = res
input[appID] = app
- app.SubmissionTime = baseline.Add(-time.Minute *
time.Duration(i))
+ app.submissionTime = baseline.Add(-time.Minute *
time.Duration(i))
input[appID] = app
}
diff --git a/pkg/scheduler/partition_test.go b/pkg/scheduler/partition_test.go
index 9df58cec..fb3ee7ac 100644
--- a/pkg/scheduler/partition_test.go
+++ b/pkg/scheduler/partition_test.go
@@ -323,7 +323,7 @@ func TestRemoveNodeWithPlaceholders(t *testing.T) {
assert.Equal(t, 1, partition.getPhAllocationCount(), "number of active
placeholders")
// fake an ask that is used
- ask := newAllocationAskAll(allocKey, appID1, taskGroup, appRes, 1,
false)
+ ask := newAllocationAskAll(allocKey, appID1, taskGroup, appRes, 1,
false, nil)
err = app.AddAllocationAsk(ask)
assert.NilError(t, err, "ask should be added to app")
_, err = app.AllocateAsk(allocKey)
@@ -743,7 +743,7 @@ func TestRemoveNodeWithReplacement(t *testing.T) {
assert.Equal(t, 2, partition.GetTotalNodeCount(), "node list was not
updated as expected")
// fake an ask that is used
- ask := newAllocationAskAll(allocKey, appID1, taskGroup, appRes, 1,
false)
+ ask := newAllocationAskAll(allocKey, appID1, taskGroup, appRes, 1,
false, nil)
err = app.AddAllocationAsk(ask)
assert.NilError(t, err, "ask should be added to app")
_, err = app.AllocateAsk(allocKey)
@@ -817,7 +817,7 @@ func TestRemoveNodeWithReal(t *testing.T) {
assert.Equal(t, 2, partition.GetTotalNodeCount(), "node list was not
updated as expected")
// fake an ask that is used
- ask := newAllocationAskAll(allocKey, appID1, taskGroup, appRes, 1,
false)
+ ask := newAllocationAskAll(allocKey, appID1, taskGroup, appRes, 1,
false, nil)
err = app.AddAllocationAsk(ask)
assert.NilError(t, err, "ask should be added to app")
_, err = app.AllocateAsk(allocKey)
@@ -4800,3 +4800,75 @@ func TestForeignAllocation(t *testing.T) {
//nolint:funlen
assert.Equal(t, 0, len(partition.foreignAllocs))
assert.Equal(t, 0, len(node.GetYunikornAllocations()))
}
+
+func TestAppSchedulingOrderFIFO(t *testing.T) {
+ setupUGM()
+ partition, err := newBasePartition()
+ assert.NilError(t, err, "partition create failed")
+ conf := configs.PartitionConfig{
+ Name: "test",
+ Queues: []configs.QueueConfig{
+ {
+ Name: "root",
+ Parent: true,
+ SubmitACL: "*",
+ Queues: []configs.QueueConfig{
+ {
+ Name: "default",
+ Parent: false,
+ Properties:
map[string]string{configs.ApplicationSortPolicy:
policies.FifoSortPolicy.String()},
+ },
+ },
+ },
+ },
+ PlacementRules: nil,
+ Limits: nil,
+ NodeSortPolicy: configs.NodeSortingPolicy{},
+ }
+ err = partition.updatePartitionDetails(conf)
+ assert.NilError(t, err, "unable to update partition config")
+
+ nodeRes :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10})
+ node := newNodeMaxResource(nodeID1, nodeRes)
+ err = partition.AddNode(node)
+ assert.NilError(t, err)
+
+ app1 := newApplication(appID1, "default", defQueue)
+ err = partition.AddApplication(app1)
+ assert.NilError(t, err, "could not add application")
+ app2 := newApplication(appID2, "default", defQueue)
+ err = partition.AddApplication(app2)
+ assert.NilError(t, err, "could not add application")
+
+ // add two asks to app2 first
+ app2AskRes :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 2})
+
+ app2Ask1 := newAllocationAskAll(allocKey2, appID2, "", app2AskRes, 0,
false, map[string]string{
+ siCommon.CreationTime: "100",
+ })
+ err = app2.AddAllocationAsk(app2Ask1)
+ assert.NilError(t, err, "could not add ask")
+ app2Ask2 := newAllocationAskAll(allocKey3, appID2, "", app2AskRes, 0,
false, map[string]string{
+ siCommon.CreationTime: "50",
+ })
+ err = app2.AddAllocationAsk(app2Ask2)
+ assert.NilError(t, err, "could not add ask")
+
+ askRes1 :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 1})
+ ask1 := newAllocationAskAll(allocKey, appID1, "", askRes1, 0, false,
map[string]string{
+ siCommon.CreationTime: "1000",
+ })
+ err = app1.AddAllocationAsk(ask1)
+ assert.NilError(t, err, "could not add ask")
+
+ // the two asks from app2 should be scheduled
+ alloc := partition.tryAllocate()
+ assert.Assert(t, alloc != nil, "no allocation was made")
+ assert.Equal(t, allocKey3, alloc.Request.GetAllocationKey())
+ alloc = partition.tryAllocate()
+ assert.Assert(t, alloc != nil, "no allocation was made")
+ assert.Equal(t, allocKey2, alloc.Request.GetAllocationKey())
+ alloc = partition.tryAllocate()
+ assert.Assert(t, alloc != nil, "no allocation was made")
+ assert.Equal(t, allocKey, alloc.Request.GetAllocationKey())
+}
diff --git a/pkg/scheduler/utilities_test.go b/pkg/scheduler/utilities_test.go
index d7cbf1f7..8e6a9af3 100644
--- a/pkg/scheduler/utilities_test.go
+++ b/pkg/scheduler/utilities_test.go
@@ -557,18 +557,18 @@ func newApplicationTGTagsWithPhTimeout(appID, partition,
queueName string, task
}
func newAllocationAskTG(allocKey, appID, taskGroup string, res
*resources.Resource, placeHolder bool) *objects.Allocation {
- return newAllocationAskAll(allocKey, appID, taskGroup, res, 1,
placeHolder)
+ return newAllocationAskAll(allocKey, appID, taskGroup, res, 1,
placeHolder, nil)
}
func newAllocationAsk(allocKey, appID string, res *resources.Resource)
*objects.Allocation {
- return newAllocationAskAll(allocKey, appID, "", res, 1, false)
+ return newAllocationAskAll(allocKey, appID, "", res, 1, false, nil)
}
func newAllocationAskPriority(allocKey, appID string, res *resources.Resource,
prio int32) *objects.Allocation {
- return newAllocationAskAll(allocKey, appID, "", res, prio, false)
+ return newAllocationAskAll(allocKey, appID, "", res, prio, false, nil)
}
-func newAllocationAskAll(allocKey, appID, taskGroup string, res
*resources.Resource, prio int32, placeHolder bool) *objects.Allocation {
+func newAllocationAskAll(allocKey, appID, taskGroup string, res
*resources.Resource, prio int32, placeHolder bool, tags map[string]string)
*objects.Allocation {
return objects.NewAllocationFromSI(&si.Allocation{
AllocationKey: allocKey,
ApplicationID: appID,
@@ -577,6 +577,7 @@ func newAllocationAskAll(allocKey, appID, taskGroup string,
res *resources.Resou
Priority: prio,
TaskGroupName: taskGroup,
Placeholder: placeHolder,
+ AllocationTags: tags,
})
}
diff --git a/pkg/webservice/handlers.go b/pkg/webservice/handlers.go
index da1916f2..9248cb15 100644
--- a/pkg/webservice/handlers.go
+++ b/pkg/webservice/handlers.go
@@ -359,7 +359,7 @@ func getApplicationDAO(app *objects.Application)
*dao.ApplicationDAOInfo {
PendingResource: app.GetPendingResource().DAOMap(),
Partition:
common.GetPartitionNameWithoutClusterID(app.Partition),
QueueName: app.GetQueuePath(),
- SubmissionTime: app.SubmissionTime.UnixNano(),
+ SubmissionTime: app.GetSubmissionTime().UnixNano(),
FinishedTime:
common.ZeroTimeInUnixNano(app.FinishedTime()),
Requests: getAllocationAsksDAO(app.GetAllRequests()),
Allocations: getAllocationsDAO(app.GetAllAllocations()),
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]