This is an automated email from the ASF dual-hosted git repository.

pbacsko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git


The following commit(s) were added to refs/heads/master by this push:
     new bf04bd44 [YUNIKORN-2772] Scheduler restart does not preserve app start 
time (#1018)
bf04bd44 is described below

commit bf04bd44bebd3af53092a25c7e242349782c25e4
Author: Peter Bacsko <[email protected]>
AuthorDate: Mon Apr 14 13:45:40 2025 +0200

    [YUNIKORN-2772] Scheduler restart does not preserve app start time (#1018)
    
    Closes: #1018
    
    Signed-off-by: Peter Bacsko <[email protected]>
---
 pkg/scheduler/objects/application.go      | 23 ++++++---
 pkg/scheduler/objects/application_test.go | 23 +++++++++
 pkg/scheduler/objects/sorters.go          |  8 ++--
 pkg/scheduler/objects/sorters_test.go     |  9 ++--
 pkg/scheduler/partition_test.go           | 78 +++++++++++++++++++++++++++++--
 pkg/scheduler/utilities_test.go           |  9 ++--
 pkg/webservice/handlers.go                |  2 +-
 7 files changed, 129 insertions(+), 23 deletions(-)

diff --git a/pkg/scheduler/objects/application.go 
b/pkg/scheduler/objects/application.go
index adff3817..9eb0e3ed 100644
--- a/pkg/scheduler/objects/application.go
+++ b/pkg/scheduler/objects/application.go
@@ -77,10 +77,9 @@ type StateLogEntry struct {
 }
 
 type Application struct {
-       ApplicationID  string            // application ID
-       Partition      string            // partition Name
-       SubmissionTime time.Time         // time application was submitted
-       tags           map[string]string // application tags used in scheduling
+       ApplicationID string            // application ID
+       Partition     string            // partition Name
+       tags          map[string]string // application tags used in scheduling
 
        // Private mutable fields need protection
        queuePath         string
@@ -91,6 +90,7 @@ type Application struct {
        sortedRequests    sortedRequests          // list of requests pre-sorted
        user              security.UserGroup      // owner of the application
        allocatedResource *resources.Resource     // total allocated resources
+       submissionTime    time.Time               // time application was 
submitted (based on the first ask)
 
        usedResource        *resources.TrackedResource // keep track of 
resource usage of the application
        preemptedResource   *resources.TrackedResource // keep track of 
preempted resource usage of the application
@@ -128,7 +128,7 @@ func NewApplication(siApp *si.AddApplicationRequest, ugi 
security.UserGroup, eve
        app := &Application{
                ApplicationID:         siApp.ApplicationID,
                Partition:             siApp.PartitionName,
-               SubmissionTime:        time.Now(),
+               submissionTime:        time.Now(),
                queuePath:             siApp.QueueName,
                tags:                  siApp.Tags,
                pending:               resources.NewResource(),
@@ -175,7 +175,7 @@ func (sa *Application) String() string {
                return "application is nil"
        }
        return fmt.Sprintf("applicationID: %s, Partition: %s, SubmissionTime: 
%x, State: %s",
-               sa.ApplicationID, sa.Partition, sa.SubmissionTime, 
sa.stateMachine.Current())
+               sa.ApplicationID, sa.Partition, sa.GetSubmissionTime(), 
sa.stateMachine.Current())
 }
 
 func (sa *Application) SetState(state string) {
@@ -597,6 +597,9 @@ func (sa *Application) AddAllocationAsk(ask *Allocation) 
error {
        if ask.IsAllocated() || resources.IsZero(ask.GetAllocatedResource()) {
                return fmt.Errorf("invalid ask added to app %s: %v", 
sa.ApplicationID, ask)
        }
+       if ask.createTime.Before(sa.submissionTime) {
+               sa.submissionTime = ask.createTime
+       }
        delta := ask.GetAllocatedResource().Clone()
 
        var oldAskResource *resources.Resource = nil
@@ -2099,7 +2102,7 @@ func (sa *Application) GetApplicationSummary(rmID string) 
*ApplicationSummary {
 func (sa *Application) getApplicationSummary(rmID string) *ApplicationSummary {
        return &ApplicationSummary{
                ApplicationID:       sa.ApplicationID,
-               SubmissionTime:      sa.SubmissionTime,
+               SubmissionTime:      sa.submissionTime,
                StartTime:           sa.startTime,
                FinishTime:          sa.finishedTime,
                User:                sa.user.User,
@@ -2262,3 +2265,9 @@ func (sa *Application) getResourceFromTags(tag string) 
*resources.Resource {
 
        return resource
 }
+
+func (sa *Application) GetSubmissionTime() time.Time {
+       sa.RLock()
+       defer sa.RUnlock()
+       return sa.submissionTime
+}
diff --git a/pkg/scheduler/objects/application_test.go 
b/pkg/scheduler/objects/application_test.go
index 62c507ed..2c110330 100644
--- a/pkg/scheduler/objects/application_test.go
+++ b/pkg/scheduler/objects/application_test.go
@@ -3650,3 +3650,26 @@ func TestTryNodesNoReserve(t *testing.T) {
        assert.Equal(t, result.ResultType, AllocatedReserved, "result type 
should be AllocatedReserved")
        assert.Equal(t, result.ReservedNodeID, node1.NodeID, "reserved node 
should be node1")
 }
+
+func TestAppSubmissionTime(t *testing.T) {
+       app := newApplication(appID0, "default", "root.default")
+       queue, err := createRootQueue(map[string]string{"first": "5"})
+       assert.NilError(t, err, "queue create failed")
+       app.queue = queue
+
+       res := 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5})
+       ask1 := newAllocationAsk(aKey, appID1, res)
+       ask1.createTime = time.Unix(0, 100)
+       err = app.AddAllocationAsk(ask1)
+       assert.NilError(t, err)
+       ask2 := newAllocationAsk(aKey2, appID1, res)
+       ask2.createTime = time.Unix(0, 200)
+       err = app.AddAllocationAsk(ask2)
+       assert.NilError(t, err)
+       assert.Equal(t, app.submissionTime, time.Unix(0, 100), "app submission 
time is not set properly")
+       ask3 := newAllocationAsk(aKey3, appID1, res)
+       ask3.createTime = time.Unix(0, 50)
+       err = app.AddAllocationAsk(ask3)
+       assert.NilError(t, err)
+       assert.Equal(t, app.submissionTime, time.Unix(0, 50), "app submission 
time is not set properly")
+}
diff --git a/pkg/scheduler/objects/sorters.go b/pkg/scheduler/objects/sorters.go
index 24f9dec7..723d99d2 100644
--- a/pkg/scheduler/objects/sorters.go
+++ b/pkg/scheduler/objects/sorters.go
@@ -150,10 +150,12 @@ func 
sortApplicationsBySubmissionTimeAndPriority(sortedApps []*Application) {
        sort.SliceStable(sortedApps, func(i, j int) bool {
                l := sortedApps[i]
                r := sortedApps[j]
-               if l.SubmissionTime.Before(r.SubmissionTime) {
+               lSubTime := l.GetSubmissionTime()
+               rSubTime := r.GetSubmissionTime()
+               if lSubTime.Before(rSubTime) {
                        return true
                }
-               if r.SubmissionTime.Before(l.SubmissionTime) {
+               if rSubTime.Before(lSubTime) {
                        return false
                }
                return l.GetAskMaxPriority() > r.GetAskMaxPriority()
@@ -172,7 +174,7 @@ func sortApplicationsByPriorityAndSubmissionTime(sortedApps 
[]*Application) {
                if leftPriority < rightPriority {
                        return false
                }
-               return l.SubmissionTime.Before(r.SubmissionTime)
+               return l.GetSubmissionTime().Before(r.GetSubmissionTime())
        })
 }
 
diff --git a/pkg/scheduler/objects/sorters_test.go 
b/pkg/scheduler/objects/sorters_test.go
index 834ab7cf..0d7f0c91 100644
--- a/pkg/scheduler/objects/sorters_test.go
+++ b/pkg/scheduler/objects/sorters_test.go
@@ -226,8 +226,7 @@ func TestSortAppsFifo(t *testing.T) {
                app := newApplication(appID, "partition", "queue")
                app.pending = res
                input[appID] = app
-               // make sure the time stamps differ at least a bit (tracking in 
nano seconds)
-               time.Sleep(time.Nanosecond * 5)
+               app.submissionTime = time.Unix(int64(i), 0)
        }
 
        // fifo - apps should come back in order created 0, 1, 2, 3
@@ -236,8 +235,8 @@ func TestSortAppsFifo(t *testing.T) {
 
        input["app-1"].askMaxPriority = 3
        input["app-3"].askMaxPriority = 5
-       input["app-2"].SubmissionTime = input["app-3"].SubmissionTime
-       input["app-1"].SubmissionTime = input["app-3"].SubmissionTime
+       input["app-2"].submissionTime = input["app-3"].submissionTime
+       input["app-1"].submissionTime = input["app-3"].submissionTime
        list = sortApplications(input, policies.FifoSortPolicy, false, nil)
        /*
        * apps order: 0, 3, 1, 2
@@ -410,7 +409,7 @@ func TestSortBySubmissionTime(t *testing.T) {
                app := newApplication(appID, "partition", "queue")
                app.pending = res
                input[appID] = app
-               app.SubmissionTime = baseline.Add(-time.Minute * 
time.Duration(i))
+               app.submissionTime = baseline.Add(-time.Minute * 
time.Duration(i))
                input[appID] = app
        }
 
diff --git a/pkg/scheduler/partition_test.go b/pkg/scheduler/partition_test.go
index 9df58cec..fb3ee7ac 100644
--- a/pkg/scheduler/partition_test.go
+++ b/pkg/scheduler/partition_test.go
@@ -323,7 +323,7 @@ func TestRemoveNodeWithPlaceholders(t *testing.T) {
        assert.Equal(t, 1, partition.getPhAllocationCount(), "number of active 
placeholders")
 
        // fake an ask that is used
-       ask := newAllocationAskAll(allocKey, appID1, taskGroup, appRes, 1, 
false)
+       ask := newAllocationAskAll(allocKey, appID1, taskGroup, appRes, 1, 
false, nil)
        err = app.AddAllocationAsk(ask)
        assert.NilError(t, err, "ask should be added to app")
        _, err = app.AllocateAsk(allocKey)
@@ -743,7 +743,7 @@ func TestRemoveNodeWithReplacement(t *testing.T) {
        assert.Equal(t, 2, partition.GetTotalNodeCount(), "node list was not 
updated as expected")
 
        // fake an ask that is used
-       ask := newAllocationAskAll(allocKey, appID1, taskGroup, appRes, 1, 
false)
+       ask := newAllocationAskAll(allocKey, appID1, taskGroup, appRes, 1, 
false, nil)
        err = app.AddAllocationAsk(ask)
        assert.NilError(t, err, "ask should be added to app")
        _, err = app.AllocateAsk(allocKey)
@@ -817,7 +817,7 @@ func TestRemoveNodeWithReal(t *testing.T) {
        assert.Equal(t, 2, partition.GetTotalNodeCount(), "node list was not 
updated as expected")
 
        // fake an ask that is used
-       ask := newAllocationAskAll(allocKey, appID1, taskGroup, appRes, 1, 
false)
+       ask := newAllocationAskAll(allocKey, appID1, taskGroup, appRes, 1, 
false, nil)
        err = app.AddAllocationAsk(ask)
        assert.NilError(t, err, "ask should be added to app")
        _, err = app.AllocateAsk(allocKey)
@@ -4800,3 +4800,75 @@ func TestForeignAllocation(t *testing.T) { 
//nolint:funlen
        assert.Equal(t, 0, len(partition.foreignAllocs))
        assert.Equal(t, 0, len(node.GetYunikornAllocations()))
 }
+
+func TestAppSchedulingOrderFIFO(t *testing.T) {
+       setupUGM()
+       partition, err := newBasePartition()
+       assert.NilError(t, err, "partition create failed")
+       conf := configs.PartitionConfig{
+               Name: "test",
+               Queues: []configs.QueueConfig{
+                       {
+                               Name:      "root",
+                               Parent:    true,
+                               SubmitACL: "*",
+                               Queues: []configs.QueueConfig{
+                                       {
+                                               Name:       "default",
+                                               Parent:     false,
+                                               Properties: 
map[string]string{configs.ApplicationSortPolicy: 
policies.FifoSortPolicy.String()},
+                                       },
+                               },
+                       },
+               },
+               PlacementRules: nil,
+               Limits:         nil,
+               NodeSortPolicy: configs.NodeSortingPolicy{},
+       }
+       err = partition.updatePartitionDetails(conf)
+       assert.NilError(t, err, "unable to update partition config")
+
+       nodeRes := 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10})
+       node := newNodeMaxResource(nodeID1, nodeRes)
+       err = partition.AddNode(node)
+       assert.NilError(t, err)
+
+       app1 := newApplication(appID1, "default", defQueue)
+       err = partition.AddApplication(app1)
+       assert.NilError(t, err, "could not add application")
+       app2 := newApplication(appID2, "default", defQueue)
+       err = partition.AddApplication(app2)
+       assert.NilError(t, err, "could not add application")
+
+       // add two asks to app2 first
+       app2AskRes := 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 2})
+
+       app2Ask1 := newAllocationAskAll(allocKey2, appID2, "", app2AskRes, 0, 
false, map[string]string{
+               siCommon.CreationTime: "100",
+       })
+       err = app2.AddAllocationAsk(app2Ask1)
+       assert.NilError(t, err, "could not add ask")
+       app2Ask2 := newAllocationAskAll(allocKey3, appID2, "", app2AskRes, 0, 
false, map[string]string{
+               siCommon.CreationTime: "50",
+       })
+       err = app2.AddAllocationAsk(app2Ask2)
+       assert.NilError(t, err, "could not add ask")
+
+       askRes1 := 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 1})
+       ask1 := newAllocationAskAll(allocKey, appID1, "", askRes1, 0, false, 
map[string]string{
+               siCommon.CreationTime: "1000",
+       })
+       err = app1.AddAllocationAsk(ask1)
+       assert.NilError(t, err, "could not add ask")
+
+       // the two asks from app2 should be scheduled
+       alloc := partition.tryAllocate()
+       assert.Assert(t, alloc != nil, "no allocation was made")
+       assert.Equal(t, allocKey3, alloc.Request.GetAllocationKey())
+       alloc = partition.tryAllocate()
+       assert.Assert(t, alloc != nil, "no allocation was made")
+       assert.Equal(t, allocKey2, alloc.Request.GetAllocationKey())
+       alloc = partition.tryAllocate()
+       assert.Assert(t, alloc != nil, "no allocation was made")
+       assert.Equal(t, allocKey, alloc.Request.GetAllocationKey())
+}
diff --git a/pkg/scheduler/utilities_test.go b/pkg/scheduler/utilities_test.go
index d7cbf1f7..8e6a9af3 100644
--- a/pkg/scheduler/utilities_test.go
+++ b/pkg/scheduler/utilities_test.go
@@ -557,18 +557,18 @@ func newApplicationTGTagsWithPhTimeout(appID, partition, 
queueName string, task
 }
 
 func newAllocationAskTG(allocKey, appID, taskGroup string, res 
*resources.Resource, placeHolder bool) *objects.Allocation {
-       return newAllocationAskAll(allocKey, appID, taskGroup, res, 1, 
placeHolder)
+       return newAllocationAskAll(allocKey, appID, taskGroup, res, 1, 
placeHolder, nil)
 }
 
 func newAllocationAsk(allocKey, appID string, res *resources.Resource) 
*objects.Allocation {
-       return newAllocationAskAll(allocKey, appID, "", res, 1, false)
+       return newAllocationAskAll(allocKey, appID, "", res, 1, false, nil)
 }
 
 func newAllocationAskPriority(allocKey, appID string, res *resources.Resource, 
prio int32) *objects.Allocation {
-       return newAllocationAskAll(allocKey, appID, "", res, prio, false)
+       return newAllocationAskAll(allocKey, appID, "", res, prio, false, nil)
 }
 
-func newAllocationAskAll(allocKey, appID, taskGroup string, res 
*resources.Resource, prio int32, placeHolder bool) *objects.Allocation {
+func newAllocationAskAll(allocKey, appID, taskGroup string, res 
*resources.Resource, prio int32, placeHolder bool, tags map[string]string) 
*objects.Allocation {
        return objects.NewAllocationFromSI(&si.Allocation{
                AllocationKey:    allocKey,
                ApplicationID:    appID,
@@ -577,6 +577,7 @@ func newAllocationAskAll(allocKey, appID, taskGroup string, 
res *resources.Resou
                Priority:         prio,
                TaskGroupName:    taskGroup,
                Placeholder:      placeHolder,
+               AllocationTags:   tags,
        })
 }
 
diff --git a/pkg/webservice/handlers.go b/pkg/webservice/handlers.go
index da1916f2..9248cb15 100644
--- a/pkg/webservice/handlers.go
+++ b/pkg/webservice/handlers.go
@@ -359,7 +359,7 @@ func getApplicationDAO(app *objects.Application) 
*dao.ApplicationDAOInfo {
                PendingResource:    app.GetPendingResource().DAOMap(),
                Partition:          
common.GetPartitionNameWithoutClusterID(app.Partition),
                QueueName:          app.GetQueuePath(),
-               SubmissionTime:     app.SubmissionTime.UnixNano(),
+               SubmissionTime:     app.GetSubmissionTime().UnixNano(),
                FinishedTime:       
common.ZeroTimeInUnixNano(app.FinishedTime()),
                Requests:           getAllocationAsksDAO(app.GetAllRequests()),
                Allocations:        getAllocationsDAO(app.GetAllAllocations()),


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to