This is an automated email from the ASF dual-hosted git repository.

wilfreds pushed a commit to branch branch-1.4
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git


The following commit(s) were added to refs/heads/branch-1.4 by this push:
     new 7c0b0d94 [YUNIKORN-1848] Report resource used by preempted pods in the 
app summary (#649)
7c0b0d94 is described below

commit 7c0b0d94165c23791423684da70cd666c18bc8ef
Author: Yongjun Zhang <[email protected]>
AuthorDate: Mon Nov 13 17:01:13 2023 +0100

    [YUNIKORN-1848] Report resource used by preempted pods in the app summary 
(#649)
    
    Closes: #649
    
    Signed-off-by: Peter Bacsko <[email protected]>
---
 pkg/common/resources/resources.go          | 22 ++++----
 pkg/scheduler/objects/application.go       | 86 ++++++++++++++++++++----------
 pkg/scheduler/objects/application_state.go |  2 +-
 pkg/scheduler/objects/application_test.go  |  2 +-
 pkg/scheduler/partition.go                 |  2 +-
 pkg/scheduler/partition_test.go            | 30 ++++++++++-
 6 files changed, 100 insertions(+), 44 deletions(-)

diff --git a/pkg/common/resources/resources.go 
b/pkg/common/resources/resources.go
index bf96e98f..52a107cb 100644
--- a/pkg/common/resources/resources.go
+++ b/pkg/common/resources/resources.go
@@ -47,43 +47,43 @@ func (q Quantity) string() string {
 }
 
 // Util struct to keep track of application resource usage
-type UsedResource struct {
+type TrackedResource struct {
        // Two level map for aggregated resource usage
        // With instance type being the top level key, the mapped value is a 
map:
        //   resource type (CPU, memory etc) -> the aggregated used time (in 
seconds) of the resource type
        //
-       UsedResourceMap map[string]map[string]int64
+       TrackedResourceMap map[string]map[string]int64
 
        sync.RWMutex
 }
 
-func (ur *UsedResource) Clone() *UsedResource {
+func (ur *TrackedResource) Clone() *TrackedResource {
        if ur == nil {
                return nil
        }
-       ret := NewUsedResource()
+       ret := NewTrackedResource()
        ur.RLock()
        defer ur.RUnlock()
-       for k, v := range ur.UsedResourceMap {
+       for k, v := range ur.TrackedResourceMap {
                dest := make(map[string]int64)
                for key, element := range v {
                        dest[key] = element
                }
-               ret.UsedResourceMap[k] = dest
+               ret.TrackedResourceMap[k] = dest
        }
        return ret
 }
 
 // Aggregate the resource usage to UsedResourceMap[instType]
 // The time the given resource used is the delta between the resource 
createTime and currentTime
-func (ur *UsedResource) AggregateUsedResource(instType string,
+func (ur *TrackedResource) AggregateTrackedResource(instType string,
        resource *Resource, bindTime time.Time) {
        ur.Lock()
        defer ur.Unlock()
 
        releaseTime := time.Now()
        timeDiff := int64(releaseTime.Sub(bindTime).Seconds())
-       aggregatedResourceTime, ok := ur.UsedResourceMap[instType]
+       aggregatedResourceTime, ok := ur.TrackedResourceMap[instType]
        if !ok {
                aggregatedResourceTime = map[string]int64{}
        }
@@ -95,7 +95,7 @@ func (ur *UsedResource) AggregateUsedResource(instType string,
                curUsage += int64(element) * timeDiff // resource size times 
timeDiff
                aggregatedResourceTime[key] = curUsage
        }
-       ur.UsedResourceMap[instType] = aggregatedResourceTime
+       ur.TrackedResourceMap[instType] = aggregatedResourceTime
 }
 
 // Never update value of Zero
@@ -157,8 +157,8 @@ func NewResourceFromConf(configMap map[string]string) 
(*Resource, error) {
        return res, nil
 }
 
-func NewUsedResource() *UsedResource {
-       return &UsedResource{UsedResourceMap: make(map[string]map[string]int64)}
+func NewTrackedResource() *TrackedResource {
+       return &TrackedResource{TrackedResourceMap: 
make(map[string]map[string]int64)}
 }
 
 func (r *Resource) String() string {
diff --git a/pkg/scheduler/objects/application.go 
b/pkg/scheduler/objects/application.go
index 7eb739c4..da3ca668 100644
--- a/pkg/scheduler/objects/application.go
+++ b/pkg/scheduler/objects/application.go
@@ -87,7 +87,8 @@ type Application struct {
        user              security.UserGroup        // owner of the application
        allocatedResource *resources.Resource       // total allocated resources
 
-       usedResource *resources.UsedResource // keep track of resource usage of 
the application
+       usedResource      *resources.TrackedResource // keep track of resource 
usage of the application
+       preemptedResource *resources.TrackedResource // keep track of preempted 
resource usage of the application
 
        maxAllocatedResource *resources.Resource         // max allocated 
resources
        allocatedPlaceholder *resources.Resource         // total allocated 
placeholder resources
@@ -117,15 +118,16 @@ type Application struct {
 }
 
 type ApplicationSummary struct {
-       ApplicationID  string
-       SubmissionTime time.Time
-       StartTime      time.Time
-       FinishTime     time.Time
-       User           string
-       Queue          string
-       State          string
-       RmID           string
-       ResourceUsage  *resources.UsedResource
+       ApplicationID     string
+       SubmissionTime    time.Time
+       StartTime         time.Time
+       FinishTime        time.Time
+       User              string
+       Queue             string
+       State             string
+       RmID              string
+       ResourceUsage     *resources.TrackedResource
+       PreemptedResource *resources.TrackedResource
 }
 
 func (as *ApplicationSummary) DoLogging() {
@@ -138,24 +140,28 @@ func (as *ApplicationSummary) DoLogging() {
                zap.String("queue", as.Queue),
                zap.String("state", as.State),
                zap.String("rmID", as.RmID),
-               zap.Any("resourceUsage", as.ResourceUsage.UsedResourceMap))
+               zap.Any("resourceUsage", as.ResourceUsage.TrackedResourceMap),
+               zap.Any("preemptedResource", 
as.PreemptedResource.TrackedResourceMap),
+       )
 }
 
 func (sa *Application) GetApplicationSummary(rmID string) *ApplicationSummary {
-       state := sa.stateMachine.Current()
-       ru := sa.usedResource.Clone()
        sa.RLock()
        defer sa.RUnlock()
+       state := sa.stateMachine.Current()
+       ru := sa.usedResource.Clone()
+       pu := sa.preemptedResource.Clone()
        appSummary := &ApplicationSummary{
-               ApplicationID:  sa.ApplicationID,
-               SubmissionTime: sa.SubmissionTime,
-               StartTime:      sa.startTime,
-               FinishTime:     sa.finishedTime,
-               User:           sa.user.User,
-               Queue:          sa.queuePath,
-               State:          state,
-               RmID:           rmID,
-               ResourceUsage:  ru,
+               ApplicationID:     sa.ApplicationID,
+               SubmissionTime:    sa.SubmissionTime,
+               StartTime:         sa.startTime,
+               FinishTime:        sa.finishedTime,
+               User:              sa.user.User,
+               Queue:             sa.queuePath,
+               State:             state,
+               RmID:              rmID,
+               ResourceUsage:     ru,
+               PreemptedResource: pu,
        }
        return appSummary
 }
@@ -169,7 +175,8 @@ func NewApplication(siApp *si.AddApplicationRequest, ugi 
security.UserGroup, eve
                tags:                  siApp.Tags,
                pending:               resources.NewResource(),
                allocatedResource:     resources.NewResource(),
-               usedResource:          resources.NewUsedResource(),
+               usedResource:          resources.NewTrackedResource(),
+               preemptedResource:     resources.NewTrackedResource(),
                maxAllocatedResource:  resources.NewResource(),
                allocatedPlaceholder:  resources.NewResource(),
                requests:              make(map[string]*AllocationAsk),
@@ -1673,10 +1680,26 @@ func (sa *Application) decUserResourceUsage(resource 
*resources.Resource, remove
        ugm.GetUserManager().DecreaseTrackedResource(sa.queuePath, 
sa.ApplicationID, resource, sa.user, removeApp)
 }
 
+// Track used and preempted resources
+func (sa *Application) trackCompletedResource(info *Allocation) {
+       if info.IsPreempted() {
+               sa.updatePreemptedResource(info)
+       } else {
+               sa.updateUsedResource(info)
+       }
+}
+
 // When the resource allocated with this allocation is to be removed,
 // have the usedResource to aggregate the resource used by this allocation
 func (sa *Application) updateUsedResource(info *Allocation) {
-       sa.usedResource.AggregateUsedResource(info.GetInstanceType(),
+       sa.usedResource.AggregateTrackedResource(info.GetInstanceType(),
+               info.GetAllocatedResource(), info.GetBindTime())
+}
+
+// When the resource allocated with this allocation is to be preempted,
+// have the preemptedResource to aggregate the resource used by this allocation
+func (sa *Application) updatePreemptedResource(info *Allocation) {
+       sa.preemptedResource.AggregateTrackedResource(info.GetInstanceType(),
                info.GetAllocatedResource(), info.GetBindTime())
 }
 
@@ -1772,8 +1795,8 @@ func (sa *Application) removeAllocationInternal(uuid 
string, releaseType si.Term
        } else {
                sa.allocatedResource = resources.Sub(sa.allocatedResource, 
alloc.GetAllocatedResource())
 
-               // Aggregate the resources used by this alloc to the 
application's user resource tracker
-               sa.updateUsedResource(alloc)
+               // Aggregate the resources used by this alloc to the 
application's resource tracker
+               sa.trackCompletedResource(alloc)
 
                // When the resource trackers are zero we should not expect 
anything to come in later.
                if sa.hasZeroAllocations() {
@@ -1825,7 +1848,7 @@ func (sa *Application) RemoveAllAllocations() 
[]*Allocation {
        for _, alloc := range sa.allocations {
                allocationsToRelease = append(allocationsToRelease, alloc)
                // Aggregate the resources used by this alloc to the 
application's user resource tracker
-               sa.updateUsedResource(alloc)
+               sa.trackCompletedResource(alloc)
                sa.appEvents.sendRemoveAllocationEvent(alloc, 
si.TerminationType_STOPPED_BY_RM)
        }
 
@@ -2008,8 +2031,15 @@ func (sa *Application) cleanupAsks() {
        sa.sortedRequests = nil
 }
 
-func (sa *Application) CleanupUsedResource() {
+func (sa *Application) cleanupTrackedResource() {
        sa.usedResource = nil
+       sa.preemptedResource = nil
+}
+
+func (sa *Application) CleanupTrackedResource() {
+       sa.Lock()
+       defer sa.Unlock()
+       sa.cleanupTrackedResource()
 }
 
 func (sa *Application) LogAppSummary(rmID string) {
diff --git a/pkg/scheduler/objects/application_state.go 
b/pkg/scheduler/objects/application_state.go
index 925d80f4..03d427f9 100644
--- a/pkg/scheduler/objects/application_state.go
+++ b/pkg/scheduler/objects/application_state.go
@@ -199,7 +199,7 @@ func NewAppState() *fsm.FSM {
                                
metrics.GetSchedulerMetrics().IncTotalApplicationsRejected()
                                app.setStateTimer(terminatedTimeout, 
app.stateMachine.Current(), ExpireApplication)
                                app.finishedTime = time.Now()
-                               app.CleanupUsedResource()
+                               app.cleanupTrackedResource()
                                // No rejected message when use 
app.HandleApplicationEvent(RejectApplication)
                                if len(event.Args) == 2 {
                                        app.rejectedMessage = 
event.Args[1].(string) //nolint:errcheck
diff --git a/pkg/scheduler/objects/application_test.go 
b/pkg/scheduler/objects/application_test.go
index 8176f7ad..2dae255f 100644
--- a/pkg/scheduler/objects/application_test.go
+++ b/pkg/scheduler/objects/application_test.go
@@ -1062,7 +1062,7 @@ func TestCompleted(t *testing.T) {
 }
 
 func assertResourceUsage(t *testing.T, appSummary *ApplicationSummary, 
memorySeconds int64, vcoresSecconds int64) {
-       detailedResource := appSummary.ResourceUsage.UsedResourceMap[instType1]
+       detailedResource := 
appSummary.ResourceUsage.TrackedResourceMap[instType1]
        assert.Equal(t, memorySeconds, detailedResource["memory"])
        assert.Equal(t, vcoresSecconds, detailedResource["vcores"])
 }
diff --git a/pkg/scheduler/partition.go b/pkg/scheduler/partition.go
index caf2296b..71c487bc 100644
--- a/pkg/scheduler/partition.go
+++ b/pkg/scheduler/partition.go
@@ -1462,7 +1462,7 @@ func (pc *PartitionContext) moveTerminatedApp(appID 
string) {
                zap.String("appID", appID),
                zap.String("app status", app.CurrentState()))
        app.LogAppSummary(pc.RmID)
-       app.CleanupUsedResource()
+       app.CleanupTrackedResource()
        pc.Lock()
        defer pc.Unlock()
        delete(pc.applications, appID)
diff --git a/pkg/scheduler/partition_test.go b/pkg/scheduler/partition_test.go
index 3cef5c8e..e4780af8 100644
--- a/pkg/scheduler/partition_test.go
+++ b/pkg/scheduler/partition_test.go
@@ -1815,9 +1815,28 @@ func TestRequiredNodeAllocation(t *testing.T) {
        assertLimits(t, getTestUserGroup(), resources.Multiply(res, 2))
 }
 
+func assertPreemptedResource(t *testing.T, appSummary 
*objects.ApplicationSummary, memorySeconds int64,
+       vcoresSecconds int64) {
+       detailedResource := 
appSummary.PreemptedResource.TrackedResourceMap["UNKNOWN"]
+       memValue, memPresent := detailedResource["memory"]
+       vcoreValue, vcorePresent := detailedResource["vcore"]
+
+       if memorySeconds != -1 {
+               assert.Equal(t, memorySeconds, memValue)
+       } else {
+               assert.Equal(t, memPresent, false)
+       }
+
+       if vcoresSecconds != -1 {
+               assert.Equal(t, vcoresSecconds, vcoreValue)
+       } else {
+               assert.Equal(t, vcorePresent, false)
+       }
+}
+
 func TestPreemption(t *testing.T) {
        setupUGM()
-       partition, _, app2, alloc1, alloc2 := setupPreemption(t)
+       partition, app1, app2, alloc1, alloc2 := setupPreemption(t)
 
        res, err := resources.NewResourceFromConf(map[string]string{"vcore": 
"5"})
        assert.NilError(t, err, "failed to create resource")
@@ -1828,7 +1847,8 @@ func TestPreemption(t *testing.T) {
        assert.NilError(t, err, "failed to add ask alloc-3 to app-2")
 
        // delay so that preemption delay passes
-       time.Sleep(100 * time.Millisecond)
+       // also make the delay 1 second to have a minimum non-zero 
resource*seconds measurement for preempted resources
+       time.Sleep(time.Second)
 
        // third allocation should not succeed, as we are currently above 
capacity
        alloc := partition.tryAllocate()
@@ -1873,6 +1893,12 @@ func TestPreemption(t *testing.T) {
        assert.Equal(t, alloc.GetResult(), objects.Allocated, "result should be 
allocated")
        assert.Equal(t, alloc.GetAllocationKey(), allocID3, "expected ask 
alloc-3 to be allocated")
        assertUserGroupResourceMaxLimits(t, getTestUserGroup(), 
resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 10000}), 
getExpectedQueuesLimitsForPreemption())
+
+       appSummary := app1.GetApplicationSummary("default")
+       assertPreemptedResource(t, appSummary, -1, 5000)
+
+       appSummary = app2.GetApplicationSummary("default")
+       assertPreemptedResource(t, appSummary, -1, 0)
 }
 
 // Preemption followed by a normal allocation


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to