This is an automated email from the ASF dual-hosted git repository.

pbacsko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git


The following commit(s) were added to refs/heads/master by this push:
     new c0140a98 [YUNIKORN-2151] Report resource used by placeholder pods in 
the app summary (#715)
c0140a98 is described below

commit c0140a984db9063867b41cae76bd9191180500cd
Author: qzhu <[email protected]>
AuthorDate: Thu Nov 16 16:32:12 2023 +0100

    [YUNIKORN-2151] Report resource used by placeholder pods in the app summary 
(#715)
    
    Closes: #715
    
    Signed-off-by: Peter Bacsko <[email protected]>
---
 pkg/scheduler/objects/application.go      | 69 ++++++++++++++++++++-----------
 pkg/scheduler/objects/application_test.go | 37 ++++++++++++-----
 2 files changed, 71 insertions(+), 35 deletions(-)

diff --git a/pkg/scheduler/objects/application.go 
b/pkg/scheduler/objects/application.go
index da3ca668..3e27613d 100644
--- a/pkg/scheduler/objects/application.go
+++ b/pkg/scheduler/objects/application.go
@@ -87,8 +87,9 @@ type Application struct {
        user              security.UserGroup        // owner of the application
        allocatedResource *resources.Resource       // total allocated resources
 
-       usedResource      *resources.TrackedResource // keep track of resource 
usage of the application
-       preemptedResource *resources.TrackedResource // keep track of preempted 
resource usage of the application
+       usedResource        *resources.TrackedResource // keep track of 
resource usage of the application
+       preemptedResource   *resources.TrackedResource // keep track of 
preempted resource usage of the application
+       placeholderResource *resources.TrackedResource // keep track of 
placeholder resource usage of the application
 
        maxAllocatedResource *resources.Resource         // max allocated 
resources
        allocatedPlaceholder *resources.Resource         // total allocated 
placeholder resources
@@ -118,16 +119,17 @@ type Application struct {
 }
 
 type ApplicationSummary struct {
-       ApplicationID     string
-       SubmissionTime    time.Time
-       StartTime         time.Time
-       FinishTime        time.Time
-       User              string
-       Queue             string
-       State             string
-       RmID              string
-       ResourceUsage     *resources.TrackedResource
-       PreemptedResource *resources.TrackedResource
+       ApplicationID       string
+       SubmissionTime      time.Time
+       StartTime           time.Time
+       FinishTime          time.Time
+       User                string
+       Queue               string
+       State               string
+       RmID                string
+       ResourceUsage       *resources.TrackedResource
+       PreemptedResource   *resources.TrackedResource
+       PlaceholderResource *resources.TrackedResource
 }
 
 func (as *ApplicationSummary) DoLogging() {
@@ -142,6 +144,7 @@ func (as *ApplicationSummary) DoLogging() {
                zap.String("rmID", as.RmID),
                zap.Any("resourceUsage", as.ResourceUsage.TrackedResourceMap),
                zap.Any("preemptedResource", 
as.PreemptedResource.TrackedResourceMap),
+               zap.Any("placeHolderResource", 
as.PlaceholderResource.TrackedResourceMap),
        )
 }
 
@@ -149,19 +152,21 @@ func (sa *Application) GetApplicationSummary(rmID string) 
*ApplicationSummary {
        sa.RLock()
        defer sa.RUnlock()
        state := sa.stateMachine.Current()
-       ru := sa.usedResource.Clone()
-       pu := sa.preemptedResource.Clone()
+       resourceUsage := sa.usedResource.Clone()
+       preemptedUsage := sa.preemptedResource.Clone()
+       placeHolderUsage := sa.placeholderResource.Clone()
        appSummary := &ApplicationSummary{
-               ApplicationID:     sa.ApplicationID,
-               SubmissionTime:    sa.SubmissionTime,
-               StartTime:         sa.startTime,
-               FinishTime:        sa.finishedTime,
-               User:              sa.user.User,
-               Queue:             sa.queuePath,
-               State:             state,
-               RmID:              rmID,
-               ResourceUsage:     ru,
-               PreemptedResource: pu,
+               ApplicationID:       sa.ApplicationID,
+               SubmissionTime:      sa.SubmissionTime,
+               StartTime:           sa.startTime,
+               FinishTime:          sa.finishedTime,
+               User:                sa.user.User,
+               Queue:               sa.queuePath,
+               State:               state,
+               RmID:                rmID,
+               ResourceUsage:       resourceUsage,
+               PreemptedResource:   preemptedUsage,
+               PlaceholderResource: placeHolderUsage,
        }
        return appSummary
 }
@@ -177,6 +182,7 @@ func NewApplication(siApp *si.AddApplicationRequest, ugi 
security.UserGroup, eve
                allocatedResource:     resources.NewResource(),
                usedResource:          resources.NewTrackedResource(),
                preemptedResource:     resources.NewTrackedResource(),
+               placeholderResource:   resources.NewTrackedResource(),
                maxAllocatedResource:  resources.NewResource(),
                allocatedPlaceholder:  resources.NewResource(),
                requests:              make(map[string]*AllocationAsk),
@@ -1684,6 +1690,8 @@ func (sa *Application) decUserResourceUsage(resource 
*resources.Resource, remove
 func (sa *Application) trackCompletedResource(info *Allocation) {
        if info.IsPreempted() {
                sa.updatePreemptedResource(info)
+       } else if info.IsPlaceholder() {
+               sa.updatePlaceholderResource(info)
        } else {
                sa.updateUsedResource(info)
        }
@@ -1696,6 +1704,13 @@ func (sa *Application) updateUsedResource(info 
*Allocation) {
                info.GetAllocatedResource(), info.GetBindTime())
 }
 
+// When the placeholder allocated with this allocation is to be removed,
+// have the placeholderResource to aggregate the resource used by this 
allocation
+func (sa *Application) updatePlaceholderResource(info *Allocation) {
+       sa.placeholderResource.AggregateTrackedResource(info.GetInstanceType(),
+               info.GetAllocatedResource(), info.GetBindTime())
+}
+
 // When the resource allocated with this allocation is to be preempted,
 // have the preemptedResource to aggregate the resource used by this allocation
 func (sa *Application) updatePreemptedResource(info *Allocation) {
@@ -1791,6 +1806,9 @@ func (sa *Application) removeAllocationInternal(uuid 
string, releaseType si.Term
                                eventWarning = "Application state not changed 
while removing a placeholder allocation"
                        }
                }
+               // Aggregate the resources used by this alloc to the 
application's resource tracker
+               sa.trackCompletedResource(alloc)
+
                sa.decUserResourceUsage(alloc.GetAllocatedResource(), removeApp)
        } else {
                sa.allocatedResource = resources.Sub(sa.allocatedResource, 
alloc.GetAllocatedResource())
@@ -2033,6 +2051,7 @@ func (sa *Application) cleanupAsks() {
 
 func (sa *Application) cleanupTrackedResource() {
        sa.usedResource = nil
+       sa.placeholderResource = nil
        sa.preemptedResource = nil
 }
 
@@ -2049,6 +2068,8 @@ func (sa *Application) LogAppSummary(rmID string) {
        appSummary := sa.GetApplicationSummary(rmID)
        appSummary.DoLogging()
        appSummary.ResourceUsage = nil
+       appSummary.PreemptedResource = nil
+       appSummary.PlaceholderResource = nil
 }
 
 func (sa *Application) HasPlaceholderAllocation() bool {
diff --git a/pkg/scheduler/objects/application_test.go 
b/pkg/scheduler/objects/application_test.go
index 2dae255f..d52b86cd 100644
--- a/pkg/scheduler/objects/application_test.go
+++ b/pkg/scheduler/objects/application_test.go
@@ -1067,6 +1067,13 @@ func assertResourceUsage(t *testing.T, appSummary 
*ApplicationSummary, memorySec
        assert.Equal(t, vcoresSecconds, detailedResource["vcores"])
 }
 
+func assertPlaceHolderResource(t *testing.T, appSummary *ApplicationSummary, 
memorySeconds int64,
+       vcoresSecconds int64) {
+       detailedResource := 
appSummary.PlaceholderResource.TrackedResourceMap[instType1]
+       assert.Equal(t, memorySeconds, detailedResource["memory"])
+       assert.Equal(t, vcoresSecconds, detailedResource["vcores"])
+}
+
 func TestResourceUsageAggregation(t *testing.T) {
        setupUGM()
 
@@ -1082,7 +1089,10 @@ func TestResourceUsageAggregation(t *testing.T) {
        assert.NilError(t, err, "failed to create resource with error")
        alloc := newAllocation(appID1, "uuid-1", nodeID1, res)
        alloc.SetInstanceType(instType1)
+       // Mock the time to be 3 seconds before
+       alloc.SetBindTime(time.Now().Add(-3 * time.Second))
        app.AddAllocation(alloc)
+
        if !resources.Equals(app.allocatedResource, res) {
                t.Errorf("allocated resources is not updated correctly: %v", 
app.allocatedResource)
        }
@@ -1094,8 +1104,6 @@ func TestResourceUsageAggregation(t *testing.T) {
        err = app.HandleApplicationEvent(RunApplication)
        assert.NilError(t, err, "no error expected new to accepted (completed 
test)")
 
-       time.Sleep(3 * time.Second)
-
        appSummary := app.GetApplicationSummary("default")
        appSummary.DoLogging()
        assertResourceUsage(t, appSummary, 0, 0)
@@ -1103,11 +1111,12 @@ func TestResourceUsageAggregation(t *testing.T) {
        // add more allocations to test the removals
        alloc = newAllocation(appID1, "uuid-2", nodeID1, res)
        alloc.SetInstanceType(instType1)
+
+       // Mock the time to be 3 seconds before
+       alloc.SetBindTime(time.Now().Add(-3 * time.Second))
        app.AddAllocation(alloc)
        assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res, 
2))
 
-       time.Sleep(3 * time.Second)
-
        // remove one of the 2
        if alloc = app.RemoveAllocation("uuid-2", 
si.TerminationType_UNKNOWN_TERMINATION_TYPE); alloc == nil {
                t.Error("returned allocations was nil allocation was not 
removed")
@@ -1125,8 +1134,6 @@ func TestResourceUsageAggregation(t *testing.T) {
        assert.Equal(t, len(allocs), 2)
        assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res, 
2))
 
-       time.Sleep(3 * time.Second)
-
        appSummary = app.GetApplicationSummary("default")
        appSummary.DoLogging()
        assertResourceUsage(t, appSummary, 300, 30)
@@ -1136,8 +1143,6 @@ func TestResourceUsageAggregation(t *testing.T) {
                t.Errorf("returned allocations was not allocation was 
incorrectly removed: %v", alloc)
        }
 
-       time.Sleep(3 * time.Second)
-
        // remove all left over allocations
        if allocs = app.RemoveAllAllocations(); allocs == nil || len(allocs) != 
2 {
                t.Errorf("returned number of allocations was incorrect: %v", 
allocs)
@@ -1151,7 +1156,7 @@ func TestResourceUsageAggregation(t *testing.T) {
 
        appSummary = app.GetApplicationSummary("default")
        appSummary.DoLogging()
-       assertResourceUsage(t, appSummary, 2100, 210)
+       assertResourceUsage(t, appSummary, 600, 60)
 }
 
 func TestRejected(t *testing.T) {
@@ -1332,6 +1337,9 @@ func TestReplaceAllocationTracking(t *testing.T) {
        ph1 := newPlaceholderAlloc(appID1, "uuid-1", nodeID1, res)
        ph2 := newPlaceholderAlloc(appID1, "uuid-2", nodeID1, res)
        ph3 := newPlaceholderAlloc(appID1, "uuid-3", nodeID1, res)
+       ph1.SetInstanceType(instType1)
+       ph2.SetInstanceType(instType1)
+       ph3.SetInstanceType(instType1)
        app.AddAllocation(ph1)
        assert.NilError(t, err, "could not add ask")
        app.addPlaceholderDataWithLocking(ph1.GetAsk())
@@ -1343,6 +1351,10 @@ func TestReplaceAllocationTracking(t *testing.T) {
        assert.NilError(t, err, "could not add ask")
        app.addPlaceholderDataWithLocking(ph3.GetAsk())
 
+       ph1.SetBindTime(time.Now().Add(-10 * time.Second))
+       ph2.SetBindTime(time.Now().Add(-10 * time.Second))
+       ph3.SetBindTime(time.Now().Add(-10 * time.Second))
+
        // replace placeholders
        realAlloc1 := newAllocation(appID1, "uuid-100", nodeID1, res)
        realAlloc1.SetResult(Replaced)
@@ -1365,6 +1377,10 @@ func TestReplaceAllocationTracking(t *testing.T) {
        app.RemoveAllocation("uuid-3", si.TerminationType_PLACEHOLDER_REPLACED)
        assert.Equal(t, "uuid-3", alloc.uuid)
        assert.Equal(t, false, app.HasPlaceholderAllocation())
+
+       // check placeholder resource usage
+       appSummary := app.GetApplicationSummary("default")
+       assertPlaceHolderResource(t, appSummary, 3000, 300)
 }
 
 func TestTimeoutPlaceholderSoftStyle(t *testing.T) {
@@ -1704,8 +1720,7 @@ func TestFinishedTime(t *testing.T) {
        assert.Assert(t, app.finishedTime.IsZero())
        assert.Assert(t, app.FinishedTime().IsZero())
 
-       // sleep 1 second to make finished time bigger than zero
-       time.Sleep(1 * time.Second)
+       // Don't need sleep here, anytime finished, we will set finishedTime 
for now
        app.UnSetQueue()
        assert.Assert(t, !app.finishedTime.IsZero())
        assert.Assert(t, !app.FinishedTime().IsZero())


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to