This is an automated email from the ASF dual-hosted git repository.
pbacsko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git
The following commit(s) were added to refs/heads/master by this push:
new c0140a98 [YUNIKORN-2151] Report resource used by placeholder pods in
the app summary (#715)
c0140a98 is described below
commit c0140a984db9063867b41cae76bd9191180500cd
Author: qzhu <[email protected]>
AuthorDate: Thu Nov 16 16:32:12 2023 +0100
[YUNIKORN-2151] Report resource used by placeholder pods in the app summary
(#715)
Closes: #715
Signed-off-by: Peter Bacsko <[email protected]>
---
pkg/scheduler/objects/application.go | 69 ++++++++++++++++++++-----------
pkg/scheduler/objects/application_test.go | 37 ++++++++++++-----
2 files changed, 71 insertions(+), 35 deletions(-)
diff --git a/pkg/scheduler/objects/application.go
b/pkg/scheduler/objects/application.go
index da3ca668..3e27613d 100644
--- a/pkg/scheduler/objects/application.go
+++ b/pkg/scheduler/objects/application.go
@@ -87,8 +87,9 @@ type Application struct {
user security.UserGroup // owner of the application
allocatedResource *resources.Resource // total allocated resources
- usedResource *resources.TrackedResource // keep track of resource
usage of the application
- preemptedResource *resources.TrackedResource // keep track of preempted
resource usage of the application
+ usedResource *resources.TrackedResource // keep track of
resource usage of the application
+ preemptedResource *resources.TrackedResource // keep track of
preempted resource usage of the application
+ placeholderResource *resources.TrackedResource // keep track of
placeholder resource usage of the application
maxAllocatedResource *resources.Resource // max allocated
resources
allocatedPlaceholder *resources.Resource // total allocated
placeholder resources
@@ -118,16 +119,17 @@ type Application struct {
}
type ApplicationSummary struct {
- ApplicationID string
- SubmissionTime time.Time
- StartTime time.Time
- FinishTime time.Time
- User string
- Queue string
- State string
- RmID string
- ResourceUsage *resources.TrackedResource
- PreemptedResource *resources.TrackedResource
+ ApplicationID string
+ SubmissionTime time.Time
+ StartTime time.Time
+ FinishTime time.Time
+ User string
+ Queue string
+ State string
+ RmID string
+ ResourceUsage *resources.TrackedResource
+ PreemptedResource *resources.TrackedResource
+ PlaceholderResource *resources.TrackedResource
}
func (as *ApplicationSummary) DoLogging() {
@@ -142,6 +144,7 @@ func (as *ApplicationSummary) DoLogging() {
zap.String("rmID", as.RmID),
zap.Any("resourceUsage", as.ResourceUsage.TrackedResourceMap),
zap.Any("preemptedResource",
as.PreemptedResource.TrackedResourceMap),
+ zap.Any("placeHolderResource",
as.PlaceholderResource.TrackedResourceMap),
)
}
@@ -149,19 +152,21 @@ func (sa *Application) GetApplicationSummary(rmID string)
*ApplicationSummary {
sa.RLock()
defer sa.RUnlock()
state := sa.stateMachine.Current()
- ru := sa.usedResource.Clone()
- pu := sa.preemptedResource.Clone()
+ resourceUsage := sa.usedResource.Clone()
+ preemptedUsage := sa.preemptedResource.Clone()
+ placeHolderUsage := sa.placeholderResource.Clone()
appSummary := &ApplicationSummary{
- ApplicationID: sa.ApplicationID,
- SubmissionTime: sa.SubmissionTime,
- StartTime: sa.startTime,
- FinishTime: sa.finishedTime,
- User: sa.user.User,
- Queue: sa.queuePath,
- State: state,
- RmID: rmID,
- ResourceUsage: ru,
- PreemptedResource: pu,
+ ApplicationID: sa.ApplicationID,
+ SubmissionTime: sa.SubmissionTime,
+ StartTime: sa.startTime,
+ FinishTime: sa.finishedTime,
+ User: sa.user.User,
+ Queue: sa.queuePath,
+ State: state,
+ RmID: rmID,
+ ResourceUsage: resourceUsage,
+ PreemptedResource: preemptedUsage,
+ PlaceholderResource: placeHolderUsage,
}
return appSummary
}
@@ -177,6 +182,7 @@ func NewApplication(siApp *si.AddApplicationRequest, ugi
security.UserGroup, eve
allocatedResource: resources.NewResource(),
usedResource: resources.NewTrackedResource(),
preemptedResource: resources.NewTrackedResource(),
+ placeholderResource: resources.NewTrackedResource(),
maxAllocatedResource: resources.NewResource(),
allocatedPlaceholder: resources.NewResource(),
requests: make(map[string]*AllocationAsk),
@@ -1684,6 +1690,8 @@ func (sa *Application) decUserResourceUsage(resource
*resources.Resource, remove
func (sa *Application) trackCompletedResource(info *Allocation) {
if info.IsPreempted() {
sa.updatePreemptedResource(info)
+ } else if info.IsPlaceholder() {
+ sa.updatePlaceholderResource(info)
} else {
sa.updateUsedResource(info)
}
@@ -1696,6 +1704,13 @@ func (sa *Application) updateUsedResource(info
*Allocation) {
info.GetAllocatedResource(), info.GetBindTime())
}
+// When the placeholder allocated with this allocation is to be removed,
+// have the placeholderResource to aggregate the resource used by this
allocation
+func (sa *Application) updatePlaceholderResource(info *Allocation) {
+ sa.placeholderResource.AggregateTrackedResource(info.GetInstanceType(),
+ info.GetAllocatedResource(), info.GetBindTime())
+}
+
// When the resource allocated with this allocation is to be preempted,
// have the preemptedResource to aggregate the resource used by this allocation
func (sa *Application) updatePreemptedResource(info *Allocation) {
@@ -1791,6 +1806,9 @@ func (sa *Application) removeAllocationInternal(uuid
string, releaseType si.Term
eventWarning = "Application state not changed
while removing a placeholder allocation"
}
}
+ // Aggregate the resources used by this alloc to the
application's resource tracker
+ sa.trackCompletedResource(alloc)
+
sa.decUserResourceUsage(alloc.GetAllocatedResource(), removeApp)
} else {
sa.allocatedResource = resources.Sub(sa.allocatedResource,
alloc.GetAllocatedResource())
@@ -2033,6 +2051,7 @@ func (sa *Application) cleanupAsks() {
func (sa *Application) cleanupTrackedResource() {
sa.usedResource = nil
+ sa.placeholderResource = nil
sa.preemptedResource = nil
}
@@ -2049,6 +2068,8 @@ func (sa *Application) LogAppSummary(rmID string) {
appSummary := sa.GetApplicationSummary(rmID)
appSummary.DoLogging()
appSummary.ResourceUsage = nil
+ appSummary.PreemptedResource = nil
+ appSummary.PlaceholderResource = nil
}
func (sa *Application) HasPlaceholderAllocation() bool {
diff --git a/pkg/scheduler/objects/application_test.go
b/pkg/scheduler/objects/application_test.go
index 2dae255f..d52b86cd 100644
--- a/pkg/scheduler/objects/application_test.go
+++ b/pkg/scheduler/objects/application_test.go
@@ -1067,6 +1067,13 @@ func assertResourceUsage(t *testing.T, appSummary
*ApplicationSummary, memorySec
assert.Equal(t, vcoresSecconds, detailedResource["vcores"])
}
+func assertPlaceHolderResource(t *testing.T, appSummary *ApplicationSummary,
memorySeconds int64,
+ vcoresSecconds int64) {
+ detailedResource :=
appSummary.PlaceholderResource.TrackedResourceMap[instType1]
+ assert.Equal(t, memorySeconds, detailedResource["memory"])
+ assert.Equal(t, vcoresSecconds, detailedResource["vcores"])
+}
+
func TestResourceUsageAggregation(t *testing.T) {
setupUGM()
@@ -1082,7 +1089,10 @@ func TestResourceUsageAggregation(t *testing.T) {
assert.NilError(t, err, "failed to create resource with error")
alloc := newAllocation(appID1, "uuid-1", nodeID1, res)
alloc.SetInstanceType(instType1)
+ // Mock the time to be 3 seconds before
+ alloc.SetBindTime(time.Now().Add(-3 * time.Second))
app.AddAllocation(alloc)
+
if !resources.Equals(app.allocatedResource, res) {
t.Errorf("allocated resources is not updated correctly: %v",
app.allocatedResource)
}
@@ -1094,8 +1104,6 @@ func TestResourceUsageAggregation(t *testing.T) {
err = app.HandleApplicationEvent(RunApplication)
assert.NilError(t, err, "no error expected new to accepted (completed
test)")
- time.Sleep(3 * time.Second)
-
appSummary := app.GetApplicationSummary("default")
appSummary.DoLogging()
assertResourceUsage(t, appSummary, 0, 0)
@@ -1103,11 +1111,12 @@ func TestResourceUsageAggregation(t *testing.T) {
// add more allocations to test the removals
alloc = newAllocation(appID1, "uuid-2", nodeID1, res)
alloc.SetInstanceType(instType1)
+
+ // Mock the time to be 3 seconds before
+ alloc.SetBindTime(time.Now().Add(-3 * time.Second))
app.AddAllocation(alloc)
assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res,
2))
- time.Sleep(3 * time.Second)
-
// remove one of the 2
if alloc = app.RemoveAllocation("uuid-2",
si.TerminationType_UNKNOWN_TERMINATION_TYPE); alloc == nil {
t.Error("returned allocations was nil allocation was not
removed")
@@ -1125,8 +1134,6 @@ func TestResourceUsageAggregation(t *testing.T) {
assert.Equal(t, len(allocs), 2)
assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res,
2))
- time.Sleep(3 * time.Second)
-
appSummary = app.GetApplicationSummary("default")
appSummary.DoLogging()
assertResourceUsage(t, appSummary, 300, 30)
@@ -1136,8 +1143,6 @@ func TestResourceUsageAggregation(t *testing.T) {
t.Errorf("returned allocations was not allocation was
incorrectly removed: %v", alloc)
}
- time.Sleep(3 * time.Second)
-
// remove all left over allocations
if allocs = app.RemoveAllAllocations(); allocs == nil || len(allocs) !=
2 {
t.Errorf("returned number of allocations was incorrect: %v",
allocs)
@@ -1151,7 +1156,7 @@ func TestResourceUsageAggregation(t *testing.T) {
appSummary = app.GetApplicationSummary("default")
appSummary.DoLogging()
- assertResourceUsage(t, appSummary, 2100, 210)
+ assertResourceUsage(t, appSummary, 600, 60)
}
func TestRejected(t *testing.T) {
@@ -1332,6 +1337,9 @@ func TestReplaceAllocationTracking(t *testing.T) {
ph1 := newPlaceholderAlloc(appID1, "uuid-1", nodeID1, res)
ph2 := newPlaceholderAlloc(appID1, "uuid-2", nodeID1, res)
ph3 := newPlaceholderAlloc(appID1, "uuid-3", nodeID1, res)
+ ph1.SetInstanceType(instType1)
+ ph2.SetInstanceType(instType1)
+ ph3.SetInstanceType(instType1)
app.AddAllocation(ph1)
assert.NilError(t, err, "could not add ask")
app.addPlaceholderDataWithLocking(ph1.GetAsk())
@@ -1343,6 +1351,10 @@ func TestReplaceAllocationTracking(t *testing.T) {
assert.NilError(t, err, "could not add ask")
app.addPlaceholderDataWithLocking(ph3.GetAsk())
+ ph1.SetBindTime(time.Now().Add(-10 * time.Second))
+ ph2.SetBindTime(time.Now().Add(-10 * time.Second))
+ ph3.SetBindTime(time.Now().Add(-10 * time.Second))
+
// replace placeholders
realAlloc1 := newAllocation(appID1, "uuid-100", nodeID1, res)
realAlloc1.SetResult(Replaced)
@@ -1365,6 +1377,10 @@ func TestReplaceAllocationTracking(t *testing.T) {
app.RemoveAllocation("uuid-3", si.TerminationType_PLACEHOLDER_REPLACED)
assert.Equal(t, "uuid-3", alloc.uuid)
assert.Equal(t, false, app.HasPlaceholderAllocation())
+
+ // check placeholder resource usage
+ appSummary := app.GetApplicationSummary("default")
+ assertPlaceHolderResource(t, appSummary, 3000, 300)
}
func TestTimeoutPlaceholderSoftStyle(t *testing.T) {
@@ -1704,8 +1720,7 @@ func TestFinishedTime(t *testing.T) {
assert.Assert(t, app.finishedTime.IsZero())
assert.Assert(t, app.FinishedTime().IsZero())
- // sleep 1 second to make finished time bigger than zero
- time.Sleep(1 * time.Second)
+ // Don't need sleep here, anytime finished, we will set finishedTime
for now
app.UnSetQueue()
assert.Assert(t, !app.finishedTime.IsZero())
assert.Assert(t, !app.FinishedTime().IsZero())
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]