This is an automated email from the ASF dual-hosted git repository.
wilfreds pushed a commit to branch branch-1.4
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git
The following commit(s) were added to refs/heads/branch-1.4 by this push:
new 7c0b0d94 [YUNIKORN-1848] Report resource used by preempted pods in the
app summary (#649)
7c0b0d94 is described below
commit 7c0b0d94165c23791423684da70cd666c18bc8ef
Author: Yongjun Zhang <[email protected]>
AuthorDate: Mon Nov 13 17:01:13 2023 +0100
[YUNIKORN-1848] Report resource used by preempted pods in the app summary
(#649)
Closes: #649
Signed-off-by: Peter Bacsko <[email protected]>
---
pkg/common/resources/resources.go | 22 ++++----
pkg/scheduler/objects/application.go | 86 ++++++++++++++++++++----------
pkg/scheduler/objects/application_state.go | 2 +-
pkg/scheduler/objects/application_test.go | 2 +-
pkg/scheduler/partition.go | 2 +-
pkg/scheduler/partition_test.go | 30 ++++++++++-
6 files changed, 100 insertions(+), 44 deletions(-)
diff --git a/pkg/common/resources/resources.go
b/pkg/common/resources/resources.go
index bf96e98f..52a107cb 100644
--- a/pkg/common/resources/resources.go
+++ b/pkg/common/resources/resources.go
@@ -47,43 +47,43 @@ func (q Quantity) string() string {
}
// Util struct to keep track of application resource usage
-type UsedResource struct {
+type TrackedResource struct {
// Two level map for aggregated resource usage
// With instance type being the top level key, the mapped value is a
map:
// resource type (CPU, memory etc) -> the aggregated used time (in
seconds) of the resource type
//
- UsedResourceMap map[string]map[string]int64
+ TrackedResourceMap map[string]map[string]int64
sync.RWMutex
}
-func (ur *UsedResource) Clone() *UsedResource {
+func (ur *TrackedResource) Clone() *TrackedResource {
if ur == nil {
return nil
}
- ret := NewUsedResource()
+ ret := NewTrackedResource()
ur.RLock()
defer ur.RUnlock()
- for k, v := range ur.UsedResourceMap {
+ for k, v := range ur.TrackedResourceMap {
dest := make(map[string]int64)
for key, element := range v {
dest[key] = element
}
- ret.UsedResourceMap[k] = dest
+ ret.TrackedResourceMap[k] = dest
}
return ret
}
// Aggregate the resource usage to UsedResourceMap[instType]
// The time the given resource used is the delta between the resource
createTime and currentTime
-func (ur *UsedResource) AggregateUsedResource(instType string,
+func (ur *TrackedResource) AggregateTrackedResource(instType string,
resource *Resource, bindTime time.Time) {
ur.Lock()
defer ur.Unlock()
releaseTime := time.Now()
timeDiff := int64(releaseTime.Sub(bindTime).Seconds())
- aggregatedResourceTime, ok := ur.UsedResourceMap[instType]
+ aggregatedResourceTime, ok := ur.TrackedResourceMap[instType]
if !ok {
aggregatedResourceTime = map[string]int64{}
}
@@ -95,7 +95,7 @@ func (ur *UsedResource) AggregateUsedResource(instType string,
curUsage += int64(element) * timeDiff // resource size times
timeDiff
aggregatedResourceTime[key] = curUsage
}
- ur.UsedResourceMap[instType] = aggregatedResourceTime
+ ur.TrackedResourceMap[instType] = aggregatedResourceTime
}
// Never update value of Zero
@@ -157,8 +157,8 @@ func NewResourceFromConf(configMap map[string]string)
(*Resource, error) {
return res, nil
}
-func NewUsedResource() *UsedResource {
- return &UsedResource{UsedResourceMap: make(map[string]map[string]int64)}
+func NewTrackedResource() *TrackedResource {
+ return &TrackedResource{TrackedResourceMap:
make(map[string]map[string]int64)}
}
func (r *Resource) String() string {
diff --git a/pkg/scheduler/objects/application.go
b/pkg/scheduler/objects/application.go
index 7eb739c4..da3ca668 100644
--- a/pkg/scheduler/objects/application.go
+++ b/pkg/scheduler/objects/application.go
@@ -87,7 +87,8 @@ type Application struct {
user security.UserGroup // owner of the application
allocatedResource *resources.Resource // total allocated resources
- usedResource *resources.UsedResource // keep track of resource usage of
the application
+ usedResource *resources.TrackedResource // keep track of resource
usage of the application
+ preemptedResource *resources.TrackedResource // keep track of preempted
resource usage of the application
maxAllocatedResource *resources.Resource // max allocated
resources
allocatedPlaceholder *resources.Resource // total allocated
placeholder resources
@@ -117,15 +118,16 @@ type Application struct {
}
type ApplicationSummary struct {
- ApplicationID string
- SubmissionTime time.Time
- StartTime time.Time
- FinishTime time.Time
- User string
- Queue string
- State string
- RmID string
- ResourceUsage *resources.UsedResource
+ ApplicationID string
+ SubmissionTime time.Time
+ StartTime time.Time
+ FinishTime time.Time
+ User string
+ Queue string
+ State string
+ RmID string
+ ResourceUsage *resources.TrackedResource
+ PreemptedResource *resources.TrackedResource
}
func (as *ApplicationSummary) DoLogging() {
@@ -138,24 +140,28 @@ func (as *ApplicationSummary) DoLogging() {
zap.String("queue", as.Queue),
zap.String("state", as.State),
zap.String("rmID", as.RmID),
- zap.Any("resourceUsage", as.ResourceUsage.UsedResourceMap))
+ zap.Any("resourceUsage", as.ResourceUsage.TrackedResourceMap),
+ zap.Any("preemptedResource",
as.PreemptedResource.TrackedResourceMap),
+ )
}
func (sa *Application) GetApplicationSummary(rmID string) *ApplicationSummary {
- state := sa.stateMachine.Current()
- ru := sa.usedResource.Clone()
sa.RLock()
defer sa.RUnlock()
+ state := sa.stateMachine.Current()
+ ru := sa.usedResource.Clone()
+ pu := sa.preemptedResource.Clone()
appSummary := &ApplicationSummary{
- ApplicationID: sa.ApplicationID,
- SubmissionTime: sa.SubmissionTime,
- StartTime: sa.startTime,
- FinishTime: sa.finishedTime,
- User: sa.user.User,
- Queue: sa.queuePath,
- State: state,
- RmID: rmID,
- ResourceUsage: ru,
+ ApplicationID: sa.ApplicationID,
+ SubmissionTime: sa.SubmissionTime,
+ StartTime: sa.startTime,
+ FinishTime: sa.finishedTime,
+ User: sa.user.User,
+ Queue: sa.queuePath,
+ State: state,
+ RmID: rmID,
+ ResourceUsage: ru,
+ PreemptedResource: pu,
}
return appSummary
}
@@ -169,7 +175,8 @@ func NewApplication(siApp *si.AddApplicationRequest, ugi
security.UserGroup, eve
tags: siApp.Tags,
pending: resources.NewResource(),
allocatedResource: resources.NewResource(),
- usedResource: resources.NewUsedResource(),
+ usedResource: resources.NewTrackedResource(),
+ preemptedResource: resources.NewTrackedResource(),
maxAllocatedResource: resources.NewResource(),
allocatedPlaceholder: resources.NewResource(),
requests: make(map[string]*AllocationAsk),
@@ -1673,10 +1680,26 @@ func (sa *Application) decUserResourceUsage(resource
*resources.Resource, remove
ugm.GetUserManager().DecreaseTrackedResource(sa.queuePath,
sa.ApplicationID, resource, sa.user, removeApp)
}
+// Track used and preempted resources
+func (sa *Application) trackCompletedResource(info *Allocation) {
+ if info.IsPreempted() {
+ sa.updatePreemptedResource(info)
+ } else {
+ sa.updateUsedResource(info)
+ }
+}
+
// When the resource allocated with this allocation is to be removed,
// have the usedResource to aggregate the resource used by this allocation
func (sa *Application) updateUsedResource(info *Allocation) {
- sa.usedResource.AggregateUsedResource(info.GetInstanceType(),
+ sa.usedResource.AggregateTrackedResource(info.GetInstanceType(),
+ info.GetAllocatedResource(), info.GetBindTime())
+}
+
+// When the resource allocated with this allocation is to be preempted,
+// have the preemptedResource to aggregate the resource used by this allocation
+func (sa *Application) updatePreemptedResource(info *Allocation) {
+ sa.preemptedResource.AggregateTrackedResource(info.GetInstanceType(),
info.GetAllocatedResource(), info.GetBindTime())
}
@@ -1772,8 +1795,8 @@ func (sa *Application) removeAllocationInternal(uuid
string, releaseType si.Term
} else {
sa.allocatedResource = resources.Sub(sa.allocatedResource,
alloc.GetAllocatedResource())
- // Aggregate the resources used by this alloc to the
application's user resource tracker
- sa.updateUsedResource(alloc)
+ // Aggregate the resources used by this alloc to the
application's resource tracker
+ sa.trackCompletedResource(alloc)
// When the resource trackers are zero we should not expect
anything to come in later.
if sa.hasZeroAllocations() {
@@ -1825,7 +1848,7 @@ func (sa *Application) RemoveAllAllocations()
[]*Allocation {
for _, alloc := range sa.allocations {
allocationsToRelease = append(allocationsToRelease, alloc)
// Aggregate the resources used by this alloc to the
application's user resource tracker
- sa.updateUsedResource(alloc)
+ sa.trackCompletedResource(alloc)
sa.appEvents.sendRemoveAllocationEvent(alloc,
si.TerminationType_STOPPED_BY_RM)
}
@@ -2008,8 +2031,15 @@ func (sa *Application) cleanupAsks() {
sa.sortedRequests = nil
}
-func (sa *Application) CleanupUsedResource() {
+func (sa *Application) cleanupTrackedResource() {
sa.usedResource = nil
+ sa.preemptedResource = nil
+}
+
+func (sa *Application) CleanupTrackedResource() {
+ sa.Lock()
+ defer sa.Unlock()
+ sa.cleanupTrackedResource()
}
func (sa *Application) LogAppSummary(rmID string) {
diff --git a/pkg/scheduler/objects/application_state.go
b/pkg/scheduler/objects/application_state.go
index 925d80f4..03d427f9 100644
--- a/pkg/scheduler/objects/application_state.go
+++ b/pkg/scheduler/objects/application_state.go
@@ -199,7 +199,7 @@ func NewAppState() *fsm.FSM {
metrics.GetSchedulerMetrics().IncTotalApplicationsRejected()
app.setStateTimer(terminatedTimeout,
app.stateMachine.Current(), ExpireApplication)
app.finishedTime = time.Now()
- app.CleanupUsedResource()
+ app.cleanupTrackedResource()
// No rejected message when use
app.HandleApplicationEvent(RejectApplication)
if len(event.Args) == 2 {
app.rejectedMessage =
event.Args[1].(string) //nolint:errcheck
diff --git a/pkg/scheduler/objects/application_test.go
b/pkg/scheduler/objects/application_test.go
index 8176f7ad..2dae255f 100644
--- a/pkg/scheduler/objects/application_test.go
+++ b/pkg/scheduler/objects/application_test.go
@@ -1062,7 +1062,7 @@ func TestCompleted(t *testing.T) {
}
func assertResourceUsage(t *testing.T, appSummary *ApplicationSummary,
memorySeconds int64, vcoresSecconds int64) {
- detailedResource := appSummary.ResourceUsage.UsedResourceMap[instType1]
+ detailedResource :=
appSummary.ResourceUsage.TrackedResourceMap[instType1]
assert.Equal(t, memorySeconds, detailedResource["memory"])
assert.Equal(t, vcoresSecconds, detailedResource["vcores"])
}
diff --git a/pkg/scheduler/partition.go b/pkg/scheduler/partition.go
index caf2296b..71c487bc 100644
--- a/pkg/scheduler/partition.go
+++ b/pkg/scheduler/partition.go
@@ -1462,7 +1462,7 @@ func (pc *PartitionContext) moveTerminatedApp(appID
string) {
zap.String("appID", appID),
zap.String("app status", app.CurrentState()))
app.LogAppSummary(pc.RmID)
- app.CleanupUsedResource()
+ app.CleanupTrackedResource()
pc.Lock()
defer pc.Unlock()
delete(pc.applications, appID)
diff --git a/pkg/scheduler/partition_test.go b/pkg/scheduler/partition_test.go
index 3cef5c8e..e4780af8 100644
--- a/pkg/scheduler/partition_test.go
+++ b/pkg/scheduler/partition_test.go
@@ -1815,9 +1815,28 @@ func TestRequiredNodeAllocation(t *testing.T) {
assertLimits(t, getTestUserGroup(), resources.Multiply(res, 2))
}
+func assertPreemptedResource(t *testing.T, appSummary
*objects.ApplicationSummary, memorySeconds int64,
+ vcoresSecconds int64) {
+ detailedResource :=
appSummary.PreemptedResource.TrackedResourceMap["UNKNOWN"]
+ memValue, memPresent := detailedResource["memory"]
+ vcoreValue, vcorePresent := detailedResource["vcore"]
+
+ if memorySeconds != -1 {
+ assert.Equal(t, memorySeconds, memValue)
+ } else {
+ assert.Equal(t, memPresent, false)
+ }
+
+ if vcoresSecconds != -1 {
+ assert.Equal(t, vcoresSecconds, vcoreValue)
+ } else {
+ assert.Equal(t, vcorePresent, false)
+ }
+}
+
func TestPreemption(t *testing.T) {
setupUGM()
- partition, _, app2, alloc1, alloc2 := setupPreemption(t)
+ partition, app1, app2, alloc1, alloc2 := setupPreemption(t)
res, err := resources.NewResourceFromConf(map[string]string{"vcore":
"5"})
assert.NilError(t, err, "failed to create resource")
@@ -1828,7 +1847,8 @@ func TestPreemption(t *testing.T) {
assert.NilError(t, err, "failed to add ask alloc-3 to app-2")
// delay so that preemption delay passes
- time.Sleep(100 * time.Millisecond)
+ // also make the delay 1 second to have a minimum non-zero
resource*seconds measurement for preempted resources
+ time.Sleep(time.Second)
// third allocation should not succeed, as we are currently above
capacity
alloc := partition.tryAllocate()
@@ -1873,6 +1893,12 @@ func TestPreemption(t *testing.T) {
assert.Equal(t, alloc.GetResult(), objects.Allocated, "result should be
allocated")
assert.Equal(t, alloc.GetAllocationKey(), allocID3, "expected ask
alloc-3 to be allocated")
assertUserGroupResourceMaxLimits(t, getTestUserGroup(),
resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 10000}),
getExpectedQueuesLimitsForPreemption())
+
+ appSummary := app1.GetApplicationSummary("default")
+ assertPreemptedResource(t, appSummary, -1, 5000)
+
+ appSummary = app2.GetApplicationSummary("default")
+ assertPreemptedResource(t, appSummary, -1, 0)
}
// Preemption followed by a normal allocation
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]