This is an automated email from the ASF dual-hosted git repository.
pbacsko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git
The following commit(s) were added to refs/heads/master by this push:
new d6874112 [YUNIKORN-2423] Remove unnecessary boolean return value from
the tracking code (#810)
d6874112 is described below
commit d687411221c5d27c5071c58dc4f4fc603dfaea65
Author: Peter Bacsko <[email protected]>
AuthorDate: Fri Apr 5 19:20:43 2024 +0200
[YUNIKORN-2423] Remove unnecessary boolean return value from the tracking
code (#810)
Closes: #810
Signed-off-by: Peter Bacsko <[email protected]>
---
pkg/scheduler/ugm/group_tracker.go | 10 +-
pkg/scheduler/ugm/group_tracker_test.go | 65 +++---------
pkg/scheduler/ugm/manager.go | 29 ++----
pkg/scheduler/ugm/manager_test.go | 174 +++++++++-----------------------
pkg/scheduler/ugm/queue_tracker.go | 18 ++--
pkg/scheduler/ugm/queue_tracker_test.go | 91 ++++-------------
pkg/scheduler/ugm/user_tracker.go | 35 ++-----
pkg/scheduler/ugm/user_tracker_test.go | 67 +++---------
8 files changed, 122 insertions(+), 367 deletions(-)
diff --git a/pkg/scheduler/ugm/group_tracker.go
b/pkg/scheduler/ugm/group_tracker.go
index 4287172b..ffc0292f 100644
--- a/pkg/scheduler/ugm/group_tracker.go
+++ b/pkg/scheduler/ugm/group_tracker.go
@@ -48,20 +48,20 @@ func newGroupTracker(groupName string, events *ugmEvents)
*GroupTracker {
return groupTracker
}
-func (gt *GroupTracker) increaseTrackedResource(queuePath, applicationID
string, usage *resources.Resource, user string) bool {
+func (gt *GroupTracker) increaseTrackedResource(queuePath, applicationID
string, usage *resources.Resource, user string) {
if gt == nil {
- return true
+ return
}
gt.Lock()
defer gt.Unlock()
gt.events.sendIncResourceUsageForGroup(gt.groupName, queuePath, usage)
gt.applications[applicationID] = user
- return gt.queueTracker.increaseTrackedResource(strings.Split(queuePath,
configs.DOT), applicationID, group, usage)
+ gt.queueTracker.increaseTrackedResource(strings.Split(queuePath,
configs.DOT), applicationID, group, usage)
}
-func (gt *GroupTracker) decreaseTrackedResource(queuePath, applicationID
string, usage *resources.Resource, removeApp bool) (bool, bool) {
+func (gt *GroupTracker) decreaseTrackedResource(queuePath, applicationID
string, usage *resources.Resource, removeApp bool) bool {
if gt == nil {
- return false, true
+ return false
}
gt.Lock()
defer gt.Unlock()
diff --git a/pkg/scheduler/ugm/group_tracker_test.go
b/pkg/scheduler/ugm/group_tracker_test.go
index 3b0e33bc..e3af91ac 100644
--- a/pkg/scheduler/ugm/group_tracker_test.go
+++ b/pkg/scheduler/ugm/group_tracker_test.go
@@ -47,10 +47,7 @@ func TestGTIncreaseTrackedResource(t *testing.T) {
}
manager.Headroom(queuePath1, TestApp1, *user)
- result := groupTracker.increaseTrackedResource(path1, TestApp1, usage1,
user.User)
- if !result {
- t.Fatalf("unable to increase tracked resource: queuepath %s,
app %s, res %v", path1, TestApp1, usage1)
- }
+ groupTracker.increaseTrackedResource(path1, TestApp1, usage1, user.User)
assert.Equal(t, 1, len(eventSystem.Events))
assert.Equal(t, si.EventRecord_UG_GROUP_RESOURCE,
eventSystem.Events[0].EventChangeDetail)
assert.Equal(t, si.EventRecord_ADD,
eventSystem.Events[0].EventChangeType)
@@ -59,28 +56,19 @@ func TestGTIncreaseTrackedResource(t *testing.T) {
if err != nil {
t.Errorf("new resource create returned error or wrong resource:
error %t, res %v", err, usage2)
}
- result = groupTracker.increaseTrackedResource(path2, TestApp2, usage2,
user.User)
- if !result {
- t.Fatalf("unable to increase tracked resource: queuepath %s,
app %s, res %v", path2, TestApp2, usage2)
- }
+ groupTracker.increaseTrackedResource(path2, TestApp2, usage2, user.User)
usage3, err := resources.NewResourceFromConf(map[string]string{"mem":
"30M", "vcore": "30"})
if err != nil {
t.Errorf("new resource create returned error or wrong resource:
error %t, res %v", err, usage3)
}
- result = groupTracker.increaseTrackedResource(path3, TestApp3, usage3,
user.User)
- if !result {
- t.Fatalf("unable to increase tracked resource: queuepath %s,
app %s, res %v", path3, TestApp3, usage3)
- }
+ groupTracker.increaseTrackedResource(path3, TestApp3, usage3, user.User)
usage4, err := resources.NewResourceFromConf(map[string]string{"mem":
"20M", "vcore": "20"})
if err != nil {
t.Errorf("new resource create returned error or wrong resource:
error %t, res %v", err, usage3)
}
- result = groupTracker.increaseTrackedResource(path4, TestApp4, usage4,
user.User)
- if !result {
- t.Fatalf("unable to increase tracked resource: queuepath %s,
app %s, res %v", path4, TestApp4, usage4)
- }
+ groupTracker.increaseTrackedResource(path4, TestApp4, usage4, user.User)
actualResources := getGroupResource(groupTracker)
assert.Equal(t, "map[mem:80000000 vcore:80000]",
actualResources["root"].String(), "wrong resource")
@@ -105,20 +93,14 @@ func TestGTDecreaseTrackedResource(t *testing.T) {
if err != nil {
t.Errorf("new resource create returned error or wrong resource:
error %t, res %v", err, usage1)
}
- result := groupTracker.increaseTrackedResource(path1, TestApp1, usage1,
user.User)
- if !result {
- t.Fatalf("unable to increase tracked resource: queuepath %s,
app %s, res %v", path1, TestApp1, usage1)
- }
+ groupTracker.increaseTrackedResource(path1, TestApp1, usage1, user.User)
assert.Equal(t, 1, len(groupTracker.getTrackedApplications()))
usage2, err := resources.NewResourceFromConf(map[string]string{"mem":
"20M", "vcore": "20"})
if err != nil {
t.Errorf("new resource create returned error or wrong resource:
error %t, res %v", err, usage2)
}
- result = groupTracker.increaseTrackedResource(path2, TestApp2, usage2,
user.User)
- if !result {
- t.Fatalf("unable to increase tracked resource: queuepath %s,
app %s, res %v", path2, TestApp2, usage2)
- }
+ groupTracker.increaseTrackedResource(path2, TestApp2, usage2, user.User)
actualResources := getGroupResource(groupTracker)
assert.Equal(t, 2, len(groupTracker.getTrackedApplications()))
@@ -132,19 +114,13 @@ func TestGTDecreaseTrackedResource(t *testing.T) {
t.Errorf("new resource create returned error or wrong resource:
error %t, res %v", err, usage3)
}
eventSystem.Reset()
- removeQT, decreased := groupTracker.decreaseTrackedResource(path1,
TestApp1, usage3, false)
- if !decreased {
- t.Fatalf("unable to decrease tracked resource: queuepath %s,
app %s, res %v, error %t", path1, TestApp1, usage3, err)
- }
+ removeQT := groupTracker.decreaseTrackedResource(path1, TestApp1,
usage3, false)
assert.Equal(t, removeQT, false, "wrong remove queue tracker value")
assert.Equal(t, 1, len(eventSystem.Events))
assert.Equal(t, si.EventRecord_UG_GROUP_RESOURCE,
eventSystem.Events[0].EventChangeDetail)
assert.Equal(t, si.EventRecord_REMOVE,
eventSystem.Events[0].EventChangeType)
- removeQT, decreased = groupTracker.decreaseTrackedResource(path2,
TestApp2, usage3, false)
- if !decreased {
- t.Fatalf("unable to decrease tracked resource: queuepath %s,
app %s, res %v, error %t", path2, TestApp2, usage3, err)
- }
+ removeQT = groupTracker.decreaseTrackedResource(path2, TestApp2,
usage3, false)
assert.Equal(t, removeQT, false, "wrong remove queue tracker value")
actualResources1 := getGroupResource(groupTracker)
@@ -159,10 +135,7 @@ func TestGTDecreaseTrackedResource(t *testing.T) {
t.Errorf("new resource create returned error or wrong resource:
error %t, res %v", err, usage3)
}
- removeQT, decreased = groupTracker.decreaseTrackedResource(path1,
TestApp1, usage4, true)
- if !decreased {
- t.Fatalf("unable to decrease tracked resource: queuepath %s,
app %s, res %v, error %t", path1, TestApp1, usage1, err)
- }
+ removeQT = groupTracker.decreaseTrackedResource(path1, TestApp1,
usage4, true)
assert.Equal(t, 1, len(groupTracker.getTrackedApplications()))
assert.Equal(t, removeQT, false, "wrong remove queue tracker value")
@@ -171,10 +144,7 @@ func TestGTDecreaseTrackedResource(t *testing.T) {
t.Errorf("new resource create returned error or wrong resource:
error %t, res %v", err, usage3)
}
- removeQT, decreased = groupTracker.decreaseTrackedResource(path2,
TestApp2, usage5, true)
- if !decreased {
- t.Fatalf("unable to decrease tracked resource: queuepath %s,
app %s, res %v, error %t", path2, TestApp2, usage2, err)
- }
+ removeQT = groupTracker.decreaseTrackedResource(path2, TestApp2,
usage5, true)
assert.Equal(t, 0, len(groupTracker.getTrackedApplications()))
assert.Equal(t, removeQT, true, "wrong remove queue tracker value")
}
@@ -192,10 +162,7 @@ func TestGTSetAndClearMaxLimits(t *testing.T) {
t.Errorf("new resource create returned error or wrong resource:
error %t, res %v", err, usage1)
}
- result := groupTracker.increaseTrackedResource(path1, TestApp1, usage1,
user.User)
- if !result {
- t.Fatalf("unable to increase tracked resource: queuepath %s,
app %s, res %v", path1, TestApp1, usage1)
- }
+ groupTracker.increaseTrackedResource(path1, TestApp1, usage1, user.User)
// higher limits - apps can run
eventSystem.Reset()
@@ -208,14 +175,8 @@ func TestGTSetAndClearMaxLimits(t *testing.T) {
assert.Equal(t, si.EventRecord_UG_GROUP_LIMIT,
eventSystem.Events[1].EventChangeDetail)
assert.Equal(t, si.EventRecord_SET,
eventSystem.Events[1].EventChangeType)
assert.Equal(t, path5, eventSystem.Events[1].ReferenceID)
- result = groupTracker.increaseTrackedResource(path1, TestApp2, usage1,
user.User)
- if !result {
- t.Fatalf("unable to increase tracked resource: queuepath %s,
app %s, res %v", path1, TestApp2, usage1)
- }
- result = groupTracker.increaseTrackedResource(path1, TestApp3, usage1,
user.User)
- if !result {
- t.Fatalf("unable to increase tracked resource: queuepath %s,
app %s, res %v", path1, TestApp3, usage1)
- }
+ groupTracker.increaseTrackedResource(path1, TestApp2, usage1, user.User)
+ groupTracker.increaseTrackedResource(path1, TestApp3, usage1, user.User)
path1expectedHeadroom :=
resources.NewResourceFromMap(map[string]resources.Quantity{
"mem": 20000000,
"vcore": 20000,
diff --git a/pkg/scheduler/ugm/manager.go b/pkg/scheduler/ugm/manager.go
index c166fbb2..b374edbe 100644
--- a/pkg/scheduler/ugm/manager.go
+++ b/pkg/scheduler/ugm/manager.go
@@ -78,7 +78,7 @@ type LimitConfig struct {
// IncreaseTrackedResource Increase the resource usage for the given user
group and queue path combination.
// As and when every allocation or asks requests fulfilled on application,
corresponding user and group
// resource usage would be increased against specific application.
-func (m *Manager) IncreaseTrackedResource(queuePath, applicationID string,
usage *resources.Resource, user security.UserGroup) bool {
+func (m *Manager) IncreaseTrackedResource(queuePath, applicationID string,
usage *resources.Resource, user security.UserGroup) {
log.Log(log.SchedUGM).Debug("Increasing resource usage",
zap.String("user", user.User),
zap.String("queue path", queuePath),
@@ -86,7 +86,7 @@ func (m *Manager) IncreaseTrackedResource(queuePath,
applicationID string, usage
zap.Stringer("resource", usage))
if queuePath == common.Empty || applicationID == common.Empty || usage
== nil || user.User == common.Empty {
log.Log(log.SchedUGM).Debug("Mandatory parameters are missing
to increase the resource usage")
- return false
+ return
}
// since we check headroom before an increase this should never result
in a creation...
// some tests might not go through a scheduling that cycle so leave this
@@ -98,14 +98,14 @@ func (m *Manager) IncreaseTrackedResource(queuePath,
applicationID string, usage
m.ensureGroupTrackerForApp(queuePath, applicationID, user)
}
- return userTracker.increaseTrackedResource(queuePath, applicationID,
usage)
+ userTracker.increaseTrackedResource(queuePath, applicationID, usage)
}
// DecreaseTrackedResource Decrease the resource usage for the given user
group and queue path combination.
// As and when every allocation or asks release happens, corresponding user
and group
// resource usage would be decreased against specific application. When the
final asks release happens, removeApp should be set to true and
// application itself would be removed from the tracker and no more usage
would be tracked further for that specific application.
-func (m *Manager) DecreaseTrackedResource(queuePath, applicationID string,
usage *resources.Resource, user security.UserGroup, removeApp bool) bool {
+func (m *Manager) DecreaseTrackedResource(queuePath, applicationID string,
usage *resources.Resource, user security.UserGroup, removeApp bool) {
log.Log(log.SchedUGM).Debug("Decreasing resource usage",
zap.String("user", user.User),
zap.String("queue path", queuePath),
@@ -114,14 +114,14 @@ func (m *Manager) DecreaseTrackedResource(queuePath,
applicationID string, usage
zap.Bool("removeApp", removeApp))
if queuePath == common.Empty || applicationID == common.Empty || usage
== nil || user.User == common.Empty {
log.Log(log.SchedUGM).Debug("Mandatory parameters are missing
to decrease the resource usage")
- return false
+ return
}
userTracker := m.GetUserTracker(user.User)
if userTracker == nil {
log.Log(log.SchedUGM).Error("user tracker must be available in
userTrackers map",
zap.String("user", user.User))
- return false
+ return
}
// get the group now as the decrease might remove the app from the user
if removeApp is true
@@ -133,25 +133,21 @@ func (m *Manager) DecreaseTrackedResource(queuePath,
applicationID string, usage
zap.String("tracked group", appGroup),
zap.Stringer("resource", usage),
zap.Bool("removeApp", removeApp))
- removeQT, decreased := userTracker.decreaseTrackedResource(queuePath,
applicationID, usage, removeApp)
- if !decreased {
- return decreased
- }
- if removeQT {
+ if userTracker.decreaseTrackedResource(queuePath, applicationID, usage,
removeApp) {
log.Log(log.SchedUGM).Info("Removing user from manager",
zap.String("user", user.User))
delete(m.userTrackers, user.User)
}
// if the app did not have a group we're done otherwise update the
groupTracker
if appGroup == common.Empty {
- return decreased
+ return
}
groupTracker := m.GetGroupTracker(appGroup)
if groupTracker == nil {
log.Log(log.SchedUGM).Error("group tracker should be available
in groupTrackers map",
zap.String("applicationID", applicationID),
zap.String("applicationID", appGroup))
- return decreased
+ return
}
log.Log(log.SchedUGM).Debug("Decreasing resource usage for group",
zap.String("group", appGroup),
@@ -159,11 +155,7 @@ func (m *Manager) DecreaseTrackedResource(queuePath,
applicationID string, usage
zap.String("application", applicationID),
zap.Stringer("resource", usage),
zap.Bool("removeApp", removeApp))
- removeQT, decreased = groupTracker.decreaseTrackedResource(queuePath,
applicationID, usage, removeApp)
- if !decreased {
- return decreased
- }
- if removeQT {
+ if groupTracker.decreaseTrackedResource(queuePath, applicationID,
usage, removeApp) {
log.Log(log.SchedUGM).Info("Removing group from manager",
zap.String("group", appGroup),
zap.String("queue path", queuePath),
@@ -171,7 +163,6 @@ func (m *Manager) DecreaseTrackedResource(queuePath,
applicationID string, usage
zap.Bool("removeApp", removeApp))
delete(m.groupTrackers, appGroup)
}
- return true
}
func (m *Manager) GetUsersResources() []*UserTracker {
diff --git a/pkg/scheduler/ugm/manager_test.go
b/pkg/scheduler/ugm/manager_test.go
index 2b7add0d..81da27f7 100644
--- a/pkg/scheduler/ugm/manager_test.go
+++ b/pkg/scheduler/ugm/manager_test.go
@@ -89,8 +89,7 @@ func TestGetGroup(t *testing.T) {
usage1, err :=
resources.NewResourceFromConf(map[string]string{"memory": "5", "vcores": "5"})
assert.NilError(t, err)
- increased := manager.IncreaseTrackedResource("root.parent.leaf",
TestApp1, usage1, user1)
- assert.Equal(t, increased, true)
+ manager.IncreaseTrackedResource("root.parent.leaf", TestApp1, usage1,
user1)
assert.Equal(t, len(manager.userTrackers), 1)
assert.Equal(t, len(manager.groupTrackers), 0)
@@ -136,13 +135,11 @@ func TestGetGroup(t *testing.T) {
// do some activities with different users - user "test1" and "test2"
belongs to "test_root" group
// ensure user "test1" and "test2" and its group linkage is intact
through appGroupTrackers map
- increased = manager.IncreaseTrackedResource("root.parent.leaf",
TestApp1, usage1, user)
- assert.Equal(t, increased, true)
+ manager.IncreaseTrackedResource("root.parent.leaf", TestApp1, usage1,
user)
assert.Equal(t, len(manager.userTrackers), 2)
user2 := security.UserGroup{User: "test2", Groups: []string{"test",
"test_root"}}
- increased = manager.IncreaseTrackedResource("root.parent.leaf",
TestApp2, usage1, user2)
- assert.Equal(t, increased, true)
+ manager.IncreaseTrackedResource("root.parent.leaf", TestApp2, usage1,
user2)
userGroupTrackersAssertsMap := map[string]string{}
userGroupTrackersAssertsMap[user.User] = TestApp1
@@ -160,8 +157,7 @@ func TestGetGroup(t *testing.T) {
assert.Equal(t, len(manager.groupTrackers), 2)
user3 := security.UserGroup{User: "test3", Groups: []string{"test"}}
- increased = manager.IncreaseTrackedResource("root.parent.leaf",
TestApp2, usage1, user3)
- assert.Equal(t, increased, true)
+ manager.IncreaseTrackedResource("root.parent.leaf", TestApp2, usage1,
user3)
assert.Equal(t, len(manager.userTrackers), 4)
ut := manager.GetUserTracker(user3.User)
actualGT := ut.appGroupTrackers[TestApp2]
@@ -195,15 +191,8 @@ func TestAddRemoveUserAndGroups(t *testing.T) {
manager.ClearUserTrackers()
manager.ClearGroupTrackers()
- increased := manager.IncreaseTrackedResource("", "", usage1, user)
- if increased {
- t.Errorf("mandatory parameters are missing. queuepath: ,
application id: , resource usage: %s, user: %s", usage1.String(), user.User)
- }
-
- increased = manager.IncreaseTrackedResource(queuePath1, TestApp1,
usage1, user)
- if !increased {
- t.Errorf("unable to increase tracked resource. queuepath: %s,
application id: %s, resource usage: %s, user: %s", queuePath1, TestApp1,
usage1.String(), user.User)
- }
+ manager.IncreaseTrackedResource("", "", usage1, user)
+ manager.IncreaseTrackedResource(queuePath1, TestApp1, usage1, user)
groupTrackers := manager.GetGroupsResources()
assert.Equal(t, len(groupTrackers), 0)
@@ -211,10 +200,7 @@ func TestAddRemoveUserAndGroups(t *testing.T) {
assert.Equal(t, user.User, manager.GetUserTracker(user.User).userName)
assert.Equal(t, manager.GetGroupTracker(user.Groups[0]) == nil, true)
- increased = manager.IncreaseTrackedResource(queuePath1, TestApp1,
usage1, user)
- if !increased {
- t.Fatalf("unable to increase tracked resource: queuepath %s,
app %s, res %v", queuePath1, TestApp1, usage1)
- }
+ manager.IncreaseTrackedResource(queuePath1, TestApp1, usage1, user)
assertUGM(t, user, resources.Multiply(usage1, 2), 1)
user1 := security.UserGroup{User: "test1", Groups: []string{"test1"}}
@@ -222,10 +208,7 @@ func TestAddRemoveUserAndGroups(t *testing.T) {
if err != nil {
t.Errorf("new resource create returned error or wrong resource:
error %t, res %v", err, usage2)
}
- increased = manager.IncreaseTrackedResource(queuePath2, TestApp2,
usage2, user1)
- if !increased {
- t.Fatalf("unable to increase tracked resource: queuepath %s,
app %s, res %v", queuePath2, TestApp2, usage2)
- }
+ manager.IncreaseTrackedResource(queuePath2, TestApp2, usage2, user1)
assertUGM(t, user1, usage2, 2)
assert.Equal(t, user.User, manager.GetUserTracker(user.User).userName)
assert.Equal(t, user1.User, manager.GetUserTracker(user1.User).userName)
@@ -249,26 +232,15 @@ func TestAddRemoveUserAndGroups(t *testing.T) {
t.Errorf("new resource create returned error or wrong resource:
error %t, res %v", err, usage3)
}
- decreased := manager.DecreaseTrackedResource("", "", usage1, user,
false)
- assert.Equal(t, decreased, false)
-
- decreased = manager.DecreaseTrackedResource(queuePath1, TestApp1,
usage3, user, false)
- if !decreased {
- t.Fatalf("unable to decrease tracked resource: queuepath %s,
app %s, res %v, error %t", queuePath1, TestApp1, usage3, err)
- }
+ manager.DecreaseTrackedResource("", "", usage1, user, false)
+ manager.DecreaseTrackedResource(queuePath1, TestApp1, usage3, user,
false)
assertUGM(t, user, usage1, 2)
- decreased = manager.DecreaseTrackedResource(queuePath1, TestApp1,
usage3, user, true)
- if !decreased {
- t.Fatalf("unable to decrease tracked resource: queuepath %s,
app %s, res %v, error %t", queuePath1, TestApp1, usage3, err)
- }
+ manager.DecreaseTrackedResource(queuePath1, TestApp1, usage3, user,
true)
assert.Equal(t, 1, len(manager.GetUsersResources()), "userTrackers
count should be 1")
assert.Equal(t, 0, len(manager.GetGroupsResources()), "groupTrackers
count should be 1")
- decreased = manager.DecreaseTrackedResource(queuePath2, TestApp2,
usage2, user1, true)
- if !decreased {
- t.Fatalf("unable to increase tracked resource: queuepath %s,
app %s, res %v, error %t", queuePath2, TestApp2, usage2, err)
- }
+ manager.DecreaseTrackedResource(queuePath2, TestApp2, usage2, user1,
true)
assert.Equal(t, 0, len(manager.GetUsersResources()), "userTrackers
count should be 0")
assert.Equal(t, 0, len(manager.GetGroupsResources()), "groupTrackers
count should be 0")
@@ -298,10 +270,7 @@ func TestUpdateConfig(t *testing.T) {
assertMaxLimits(t, user, expectedResource, 5)
for i := 1; i <= 5; i++ {
- increased := manager.IncreaseTrackedResource(queuePath1,
TestApp1, usage, user)
- if !increased {
- t.Fatalf("unable to increase tracked resource:
queuepath %s, app %s, res %v, error %t", queuePath1, TestApp1, usage, err)
- }
+ manager.IncreaseTrackedResource(queuePath1, TestApp1, usage,
user)
}
// configure max resource for root.parent lesser than current resource
usage. should be allowed to set but user cannot be allowed to do any activity
further
@@ -316,10 +285,7 @@ func TestUpdateConfig(t *testing.T) {
err = manager.UpdateConfig(conf.Queues[0], "root")
assert.NilError(t, err, "unable to set the limit for user user1 because
current resource usage is greater than config max resource for root.parent")
- increased := manager.IncreaseTrackedResource(queuePath1, TestApp1,
usage, user)
- if !increased {
- t.Fatalf("unable to increase tracked resource: queuepath %s,
app %s, res %v", queuePath1, TestApp1, user)
- }
+ manager.IncreaseTrackedResource(queuePath1, TestApp1, usage, user)
// configure max resource for root lesser than current resource usage.
should be allowed to set but user cannot be allowed to do any activity further
conf = createConfig(user.User, user.Groups[0], "memory", "50", 10, 10)
@@ -384,8 +350,7 @@ func TestUseWildCard(t *testing.T) {
for i := 0; i < 5; i++ {
// should run as user has already fallen back on wild card user
limit set on "root.parent" map[memory:50 vcores:50]
- increased := manager.IncreaseTrackedResource(queuePath1,
TestApp1, usage, user1)
- assert.Assert(t, increased)
+ manager.IncreaseTrackedResource(queuePath1, TestApp1, usage,
user1)
}
// should not run as user has exceeded wild card user limit set on
"root.parent" map[memory:50 vcores:50]
@@ -437,10 +402,7 @@ func TestUpdateConfigWithWildCardUsersAndGroups(t
*testing.T) {
assertMaxLimits(t, user, expectedResource, 5)
for i := 1; i <= 5; i++ {
- increased := manager.IncreaseTrackedResource(queuePath1,
TestApp1, usage, user)
- if !increased {
- t.Fatalf("unable to increase tracked resource:
queuepath %s, app %s, res %v", queuePath1, TestApp1, expectedResource)
- }
+ manager.IncreaseTrackedResource(queuePath1, TestApp1, usage,
user)
}
// configure max resource for root.parent as map[memory:60 vcores:60]
to allow one more application to run
@@ -449,8 +411,7 @@ func TestUpdateConfigWithWildCardUsersAndGroups(t
*testing.T) {
assert.NilError(t, err, "unable to set the limit for user user1 because
current resource usage is greater than config max resource for root.parent")
// should run as user 'user' setting is map[memory:60 vcores:60] and
total usage of "root.parent" is map[memory:50 vcores:50]
- increased := manager.IncreaseTrackedResource(queuePath1, TestApp2,
usage, user)
- assert.Equal(t, increased, true)
+ manager.IncreaseTrackedResource(queuePath1, TestApp2, usage, user)
// should not run as user 'user' setting is map[memory:60 vcores:60]
and total usage of "root.parent" is map[memory:60 vcores:60]
headroom := manager.Headroom(queuePath1, TestApp3, user)
@@ -470,8 +431,7 @@ func TestUpdateConfigWithWildCardUsersAndGroups(t
*testing.T) {
// user1 still should be able to run app as wild card user '*' setting
is map[memory:70 vcores:70] for "root.parent" and
// total usage of "root.parent" is map[memory:60 vcores:60]
- increased = manager.IncreaseTrackedResource(queuePath1, TestApp2,
usage, user)
- assert.Equal(t, increased, true)
+ manager.IncreaseTrackedResource(queuePath1, TestApp2, usage, user)
// user1 should not be able to run app as wild card user '*' setting is
map[memory:70 vcores:70] for "root.parent"
// and total usage of "root.parent" is map[memory:70 vcores:70]
@@ -494,8 +454,7 @@ func TestUpdateConfigWithWildCardUsersAndGroups(t
*testing.T) {
// can be allowed to run upto resource usage map[memory:70 vcores:70]
for i := 1; i <= 7; i++ {
- increased = manager.IncreaseTrackedResource(queuePath1,
TestApp1, usage, user1)
- assert.Equal(t, increased, true)
+ manager.IncreaseTrackedResource(queuePath1, TestApp1, usage,
user1)
}
// user2 should not be able to run app as user2 max limit is
map[memory:70 vcores:70] and usage so far is map[memory:70 vcores:70]
@@ -507,8 +466,7 @@ func TestUpdateConfigWithWildCardUsersAndGroups(t
*testing.T) {
assert.NilError(t, manager.UpdateConfig(conf.Queues[0], "root"))
// user3 should be able to run app as group3 uses wild card group limit
settings map[memory:10 vcores:10]
- increased = manager.IncreaseTrackedResource(queuePath1, TestApp1,
usage, user3)
- assert.Equal(t, increased, true)
+ manager.IncreaseTrackedResource(queuePath1, TestApp1, usage, user3)
// user4 (though belongs to different group, group4) should not be able
to run app as group4 also
// uses wild card group limit settings map[memory:10 vcores:10]
@@ -525,8 +483,7 @@ func TestUpdateConfigWithWildCardUsersAndGroups(t
*testing.T) {
// Now group4 would be used as user4 is running TestApp2 for the first
time. So can be allowed to run upto resource usage map[memory:70 vcores:70]
for i := 1; i <= 7; i++ {
- increased = manager.IncreaseTrackedResource(queuePath1,
TestApp2, usage, user4)
- assert.Equal(t, increased, true)
+ manager.IncreaseTrackedResource(queuePath1, TestApp2, usage,
user4)
}
// user4 should not be able to run app as user4 max limit is
map[memory:70 vcores:70] and usage so far is map[memory:70 vcores:70]
@@ -618,8 +575,7 @@ func TestUpdateConfigClearEarlierSetGroupLimits(t
*testing.T) {
}
cQueue := "root.parent.leaf"
for i := 1; i <= 2; i++ {
- increased := manager.IncreaseTrackedResource(cQueue, TestApp1,
usage, user)
- assert.Equal(t, increased, true, "unable to increase tracked
resource: queuepath "+cQueue+", app "+TestApp1+", res "+usage.String())
+ manager.IncreaseTrackedResource(cQueue, TestApp1, usage, user)
}
assert.NilError(t, manager.UpdateConfig(conf.Queues[0], "root"))
headroom := manager.Headroom(cQueue, TestApp1, user)
@@ -646,47 +602,36 @@ func TestSetMaxLimitsForRemovedUsers(t *testing.T) {
assertMaxLimits(t, user, expectedResource, 5)
for i := 1; i <= 2; i++ {
- increased := manager.IncreaseTrackedResource(queuePath1,
TestApp1, usage, user)
- assert.Equal(t, increased, true, "unable to increase tracked
resource: queuepath "+queuePath1+", app "+TestApp1+", res "+usage.String())
+ manager.IncreaseTrackedResource(queuePath1, TestApp1, usage,
user)
}
assert.Equal(t, manager.GetUserTracker(user.User) != nil, true)
assert.Equal(t, manager.GetGroupTracker(user.Groups[0]) != nil, true)
- decreased := manager.DecreaseTrackedResource(queuePath1, TestApp1,
usage, user, false)
- assert.Equal(t, decreased, true)
-
- decreased = manager.DecreaseTrackedResource(queuePath1, TestApp1,
usage, user, true)
- assert.Equal(t, decreased, true)
+ manager.DecreaseTrackedResource(queuePath1, TestApp1, usage, user,
false)
+ manager.DecreaseTrackedResource(queuePath1, TestApp1, usage, user, true)
assert.Equal(t, manager.GetUserTracker(user.User) != nil, true)
assert.Equal(t, manager.GetGroupTracker(user.Groups[0]) != nil, true)
- increased := manager.IncreaseTrackedResource("root.parent.leaf",
TestApp1, usage, user)
- assert.Equal(t, increased, true, "unable to increase tracked resource:
queuepath root.parent.leaf, app "+TestApp1+", res "+usage.String())
-
+ manager.IncreaseTrackedResource("root.parent.leaf", TestApp1, usage,
user)
headroom := manager.Headroom("root.parent.leaf", TestApp1, user)
assert.Equal(t, headroom.FitInMaxUndef(usage), false)
assert.Equal(t, manager.GetUserTracker(user.User) != nil, true)
assert.Equal(t, manager.GetGroupTracker(user.Groups[0]) != nil, true)
- decreased = manager.DecreaseTrackedResource("root.parent.leaf",
TestApp1, usage, user, true)
- assert.Equal(t, decreased, true, "unable to decrease tracked resource:
queuepath root.parent.leaf, app "+TestApp1+", res "+usage.String())
+ manager.DecreaseTrackedResource("root.parent.leaf", TestApp1, usage,
user, true)
conf = createConfigWithoutLimits()
assert.NilError(t, manager.UpdateConfig(conf.Queues[0], "root"))
for i := 1; i <= 2; i++ {
- increased := manager.IncreaseTrackedResource(queuePath1,
TestApp1, usage, user)
- assert.Equal(t, increased, true, "unable to increase tracked
resource: queuepath "+queuePath1+", app "+TestApp1+", res "+usage.String())
+ manager.IncreaseTrackedResource(queuePath1, TestApp1, usage,
user)
}
assert.Equal(t, manager.GetUserTracker(user.User) != nil, true)
assert.Equal(t, manager.GetGroupTracker(user.Groups[0]) == nil, true)
- decreased = manager.DecreaseTrackedResource(queuePath1, TestApp1,
usage, user, false)
- assert.Equal(t, decreased, true)
-
- decreased = manager.DecreaseTrackedResource(queuePath1, TestApp1,
usage, user, true)
- assert.Equal(t, decreased, true)
+ manager.DecreaseTrackedResource(queuePath1, TestApp1, usage, user,
false)
+ manager.DecreaseTrackedResource(queuePath1, TestApp1, usage, user, true)
assert.Equal(t, manager.GetUserTracker(user.User) == nil, true)
assert.Equal(t, manager.GetGroupTracker(user.Groups[0]) == nil, true)
}
@@ -712,9 +657,7 @@ func TestUserGroupHeadroom(t *testing.T) {
headroom := manager.Headroom("root.parent.leaf", TestApp1, user)
assert.Equal(t, resources.Equals(headroom, usage), true)
- increased := manager.IncreaseTrackedResource("root.parent.leaf",
TestApp1, usage, user)
- assert.Equal(t, increased, true, "unable to increase tracked resource:
queuepath "+queuePath1+", app "+TestApp1+", res "+usage.String())
-
+ manager.IncreaseTrackedResource("root.parent.leaf", TestApp1, usage,
user)
headroom = manager.Headroom("root.parent.leaf", TestApp1, user)
assert.Equal(t, manager.GetUserTracker(user.User) != nil, true)
assert.Equal(t, manager.GetGroupTracker(user.Groups[0]) != nil, true)
@@ -762,16 +705,13 @@ func TestDecreaseTrackedResourceForGroupTracker(t
*testing.T) {
t.Errorf("new resource create returned error or wrong resource:
error %t, res %v", err, usage)
}
- increased := manager.IncreaseTrackedResource("root.parent", TestApp1,
usage, user)
- assert.Equal(t, increased, true, "unable to increase tracked resource:
queuepath root.parent, app "+TestApp1+", res "+usage.String())
-
+ manager.IncreaseTrackedResource("root.parent", TestApp1, usage, user)
groupTracker := m.GetGroupTracker(user.Groups[0])
assert.Equal(t, groupTracker != nil, true)
assert.Equal(t,
groupTracker.queueTracker.childQueueTrackers["parent"].runningApplications[TestApp1],
true)
assert.Equal(t,
resources.Equals(groupTracker.queueTracker.childQueueTrackers["parent"].resourceUsage,
usage), true)
- decreased := manager.DecreaseTrackedResource("root.parent", TestApp1,
usage, user, true)
- assert.Equal(t, decreased, true, "unable to decrease tracked resource:
queuepath root.parent, app "+TestApp1+", res "+usage.String())
+ manager.DecreaseTrackedResource("root.parent", TestApp1, usage, user,
true)
groupTracker = m.GetGroupTracker(user.Groups[0])
assert.Equal(t, groupTracker != nil, true)
@@ -804,10 +744,8 @@ func TestUserGroupLimitWithMultipleApps(t *testing.T) {
}
// run different apps with different groups linkage (different queue
limit settings)
- increased := manager.IncreaseTrackedResource(queuePath1, TestApp1,
usage, userGroup)
- assert.Equal(t, increased, true)
- increased = manager.IncreaseTrackedResource(queuePath2, TestApp2,
usage, userGroup)
- assert.Equal(t, increased, true)
+ manager.IncreaseTrackedResource(queuePath1, TestApp1, usage, userGroup)
+ manager.IncreaseTrackedResource(queuePath2, TestApp2, usage, userGroup)
// ensure different groups are linked and resource usage is correct
assert.Equal(t, len(manager.getUserTracker("user").appGroupTrackers), 2)
@@ -825,10 +763,8 @@ func TestUserGroupLimitWithMultipleApps(t *testing.T) {
assert.Equal(t, resources.Equals(headroom, resources.Zero), true, "init
headroom is not expected")
// remove the apps
- decreased := manager.DecreaseTrackedResource(queuePath1, TestApp1,
usage, userGroup, true)
- assert.Equal(t, decreased, true)
- decreased = manager.DecreaseTrackedResource(queuePath2, TestApp2,
usage, userGroup, true)
- assert.Equal(t, decreased, true)
+ manager.DecreaseTrackedResource(queuePath1, TestApp1, usage, userGroup,
true)
+ manager.DecreaseTrackedResource(queuePath2, TestApp2, usage, userGroup,
true)
// assert group linkage has been removed
assert.Equal(t, len(manager.getUserTracker("user").appGroupTrackers), 0)
@@ -952,9 +888,7 @@ func TestCanRunApp(t *testing.T) {
canRunApp := manager.CanRunApp("root.default",
TestApp1, user)
assert.Equal(t, canRunApp, true, fmt.Sprintf("user %s
should be able to run app %s", user.User, TestApp1))
- increased :=
manager.IncreaseTrackedResource("root.default", TestApp1, usage, user)
- assert.Equal(t, increased, true, "unable to increase
tracked resource: queuepath root.parent, app "+TestApp1+", res "+usage.String())
-
+ manager.IncreaseTrackedResource("root.default",
TestApp1, usage, user)
canRunApp = manager.CanRunApp("root.default", TestApp2,
user)
assert.Equal(t, canRunApp, false, fmt.Sprintf("user %s
shouldn't be able to run app %s", user.User, TestApp2))
})
@@ -1011,9 +945,7 @@ func TestSeparateUserGroupHeadroom(t *testing.T) {
t.Errorf("new resource create returned error or
wrong resource: error %t, res %v", err, usage)
}
- increased :=
manager.IncreaseTrackedResource(queuePathParent, TestApp1, usage, tc.user)
- assert.Equal(t, increased, true, "unable to increase
tracked resource: queuepath "+queuePathParent+", app "+TestApp1+", res
"+usage.String())
-
+ manager.IncreaseTrackedResource(queuePathParent,
TestApp1, usage, tc.user)
headroom := manager.Headroom(queuePathParent, TestApp1,
tc.user)
expectedHeadroom, err :=
resources.NewResourceFromConf(tinyResource)
if err != nil {
@@ -1247,8 +1179,7 @@ func TestUserGroupLimit(t *testing.T) { //nolint:funlen
headroom := manager.Headroom(queuePathParent, TestApp1,
tc.user)
assert.Equal(t, resources.Equals(headroom,
initExpectedHeadroom), true, "init headroom is not expected")
- increased :=
manager.IncreaseTrackedResource(queuePathParent, TestApp1, usage, tc.user)
- assert.Equal(t, increased, true, "unable to increase
tracked resource: queuepath "+queuePathParent+", app "+TestApp1+", res
"+usage.String())
+ manager.IncreaseTrackedResource(queuePathParent,
TestApp1, usage, tc.user)
userTracker := manager.GetUserTracker(tc.user.User)
assert.Equal(t, userTracker != nil, true,
fmt.Sprintf("can't get user tracker: %s", tc.user.User))
assert.Equal(t,
resources.Equals(userTracker.queueTracker.resourceUsage, usage), true, "user
tracker resource usage is not expected at root level")
@@ -1571,14 +1502,9 @@ func TestUserGroupLimitChange(t *testing.T) {
//nolint:funlen
t.Errorf("new resource create returned error or
wrong resource: error %t, res %v", err, usage)
}
- increased :=
manager.IncreaseTrackedResource(queuePathParent, TestApp1, usage, tc.user)
- assert.Equal(t, increased, true, "unable to increase
tracked resource: queuepath "+queuePathParent+", app "+TestApp1+", res
"+usage.String())
-
- increased =
manager.IncreaseTrackedResource(queuePathParent, TestApp2, usage, tc.user)
- assert.Equal(t, increased, true, "unable to increase
tracked resource: queuepath "+queuePathParent+", app "+TestApp2+", res
"+usage.String())
-
- decreased :=
manager.DecreaseTrackedResource(queuePathParent, TestApp2, usage, tc.user, true)
- assert.Equal(t, decreased, true, "unable to decreased
tracked resource: queuepath "+queuePathParent+", app "+TestApp2+", res
"+usage.String())
+ manager.IncreaseTrackedResource(queuePathParent,
TestApp1, usage, tc.user)
+ manager.IncreaseTrackedResource(queuePathParent,
TestApp2, usage, tc.user)
+ manager.DecreaseTrackedResource(queuePathParent,
TestApp2, usage, tc.user, true)
conf.Queues[0].Queues[0].Limits = tc.newLimits
assert.NilError(t, manager.UpdateConfig(conf.Queues[0],
"root"))
@@ -1610,22 +1536,16 @@ func TestMultipleGroupLimitChange(t *testing.T) {
}
// all users can increate usage within the quota
- increased := manager.IncreaseTrackedResource(queuePathParent,
"test-app-1-1", usage, user1)
- assert.Equal(t, increased, true, "unable to increase tracked resource:
queuepath "+queuePathParent+", app test-app-1-1, res "+usage.String())
-
- increased = manager.IncreaseTrackedResource(queuePathParent,
"test-app-2-1", usage, user2)
- assert.Equal(t, increased, true, "unable to increase tracked resource:
queuepath "+queuePathParent+", app test-app-2-1, res "+usage.String())
-
- increased = manager.IncreaseTrackedResource(queuePathParent,
"test-app-3-1", usage, user3)
- assert.Equal(t, increased, true, "unable to increase tracked resource:
queuepath "+queuePathParent+", app test-app-3-1, res "+usage.String())
+ manager.IncreaseTrackedResource(queuePathParent, "test-app-1-1", usage,
user1)
+ manager.IncreaseTrackedResource(queuePathParent, "test-app-2-1", usage,
user2)
+ manager.IncreaseTrackedResource(queuePathParent, "test-app-3-1", usage,
user3)
// remove group2 from the specific group
conf.Queues[0].Queues[0].Limits[0].Groups = []string{"group1"}
assert.NilError(t, manager.UpdateConfig(conf.Queues[0], "root"))
// user1 still can increase usage within the quota
- increased = manager.IncreaseTrackedResource(queuePathParent,
"test-app-1-2", usage, user1)
- assert.Equal(t, increased, true, "unable to increase tracked resource:
queuepath "+queuePathParent+", app test-app-1-2, res "+usage.String())
+ manager.IncreaseTrackedResource(queuePathParent, "test-app-1-2", usage,
user1)
// user2 can't increase usage more than wildcard limit
headroom := manager.Headroom(queuePathParent, "test-app-2-2", user2)
diff --git a/pkg/scheduler/ugm/queue_tracker.go
b/pkg/scheduler/ugm/queue_tracker.go
index fe2177a7..2f5a470d 100644
--- a/pkg/scheduler/ugm/queue_tracker.go
+++ b/pkg/scheduler/ugm/queue_tracker.go
@@ -88,7 +88,7 @@ const (
)
// Note: Lock free call. The Lock of the linked tracker (UserTracker and
GroupTracker) should be held before calling this function.
-func (qt *QueueTracker) increaseTrackedResource(hierarchy []string,
applicationID string, trackType trackingType, usage *resources.Resource) bool {
+func (qt *QueueTracker) increaseTrackedResource(hierarchy []string,
applicationID string, trackType trackingType, usage *resources.Resource) {
log.Log(log.SchedUGM).Debug("Increasing resource usage",
zap.Int("tracking type", int(trackType)),
zap.String("queue path", qt.queuePath),
@@ -103,9 +103,7 @@ func (qt *QueueTracker) increaseTrackedResource(hierarchy
[]string, applicationI
if qt.childQueueTrackers[childName] == nil {
qt.childQueueTrackers[childName] =
newQueueTracker(qt.queuePath, childName, trackType)
}
- if
!qt.childQueueTrackers[childName].increaseTrackedResource(hierarchy[1:],
applicationID, trackType, usage) {
- return false
- }
+
qt.childQueueTrackers[childName].increaseTrackedResource(hierarchy[1:],
applicationID, trackType, usage)
}
if qt.resourceUsage == nil {
qt.resourceUsage = resources.NewResource()
@@ -122,11 +120,10 @@ func (qt *QueueTracker) increaseTrackedResource(hierarchy
[]string, applicationI
zap.Bool("use wild card", qt.useWildCard),
zap.Stringer("total resource after increasing",
qt.resourceUsage),
zap.Int("total applications after increasing",
len(qt.runningApplications)))
- return true
}
// Note: Lock free call. The Lock of the linked tracker (UserTracker and
GroupTracker) should be held before calling this function.
-func (qt *QueueTracker) decreaseTrackedResource(hierarchy []string,
applicationID string, usage *resources.Resource, removeApp bool) (bool, bool) {
+func (qt *QueueTracker) decreaseTrackedResource(hierarchy []string,
applicationID string, usage *resources.Resource, removeApp bool) bool {
log.Log(log.SchedUGM).Debug("Decreasing resource usage",
zap.String("queue path", qt.queuePath),
zap.Strings("hierarchy", hierarchy),
@@ -140,12 +137,9 @@ func (qt *QueueTracker) decreaseTrackedResource(hierarchy
[]string, applicationI
if qt.childQueueTrackers[childName] == nil {
log.Log(log.SchedUGM).Error("Child queueTracker tracker
must be available in child queues map",
zap.String("child queueTracker name",
childName))
- return false, false
- }
- removeQT, decreased :=
qt.childQueueTrackers[childName].decreaseTrackedResource(hierarchy[1:],
applicationID, usage, removeApp)
- if !decreased {
- return false, decreased
+ return false
}
+ removeQT :=
qt.childQueueTrackers[childName].decreaseTrackedResource(hierarchy[1:],
applicationID, usage, removeApp)
if removeQT {
log.Log(log.SchedUGM).Debug("Removed queue tracker
linkage from its parent",
zap.String("queue path ", qt.queuePath),
@@ -175,7 +169,7 @@ func (qt *QueueTracker) decreaseTrackedResource(hierarchy
[]string, applicationI
log.Log(log.SchedUGM).Debug("Remove queue tracker",
zap.String("queue path ", qt.queuePath),
zap.Bool("remove QT", removeQT))
- return removeQT, true
+ return removeQT
}
// Note: Lock free call. The Lock of the linked tracker (UserTracker and
GroupTracker) should be held before calling this function.
diff --git a/pkg/scheduler/ugm/queue_tracker_test.go
b/pkg/scheduler/ugm/queue_tracker_test.go
index 23a92c95..1bdd36da 100644
--- a/pkg/scheduler/ugm/queue_tracker_test.go
+++ b/pkg/scheduler/ugm/queue_tracker_test.go
@@ -40,37 +40,25 @@ func TestQTIncreaseTrackedResource(t *testing.T) {
t.Errorf("new resource create returned error or wrong resource:
error %t, res %v", err, usage1)
}
- result :=
queueTracker.increaseTrackedResource(strings.Split(queuePath1, configs.DOT),
TestApp1, user, usage1)
- if !result {
- t.Fatalf("unable to increase tracked resource: queuepath %s,
app %s, res %v", queuePath1, TestApp1, usage1)
- }
+ queueTracker.increaseTrackedResource(strings.Split(queuePath1,
configs.DOT), TestApp1, user, usage1)
usage2, err := resources.NewResourceFromConf(map[string]string{"mem":
"20M", "vcore": "20"})
if err != nil {
t.Errorf("new resource create returned error or wrong resource:
error %t, res %v", err, usage2)
}
- result = queueTracker.increaseTrackedResource(strings.Split(queuePath2,
configs.DOT), TestApp2, user, usage2)
- if !result {
- t.Fatalf("unable to increase tracked resource: queuepath %s,
app %s, res %v", queuePath2, TestApp2, usage2)
- }
+ queueTracker.increaseTrackedResource(strings.Split(queuePath2,
configs.DOT), TestApp2, user, usage2)
usage3, err := resources.NewResourceFromConf(map[string]string{"mem":
"30M", "vcore": "30"})
if err != nil {
t.Errorf("new resource create returned error or wrong resource:
error %t, res %v", err, usage3)
}
- result = queueTracker.increaseTrackedResource(strings.Split(queuePath3,
configs.DOT), TestApp3, user, usage3)
- if !result {
- t.Fatalf("unable to increase tracked resource: queuepath %s,
app %s, res %v", queuePath3, TestApp3, usage3)
- }
+ queueTracker.increaseTrackedResource(strings.Split(queuePath3,
configs.DOT), TestApp3, user, usage3)
usage4, err := resources.NewResourceFromConf(map[string]string{"mem":
"20M", "vcore": "20"})
if err != nil {
t.Errorf("new resource create returned error or wrong resource:
error %t, res %v", err, usage3)
}
- result = queueTracker.increaseTrackedResource(strings.Split(queuePath4,
configs.DOT), TestApp4, user, usage4)
- if !result {
- t.Fatalf("unable to increase tracked resource: queuepath %s,
app %s, res %v", queuePath4, TestApp4, usage4)
- }
+ queueTracker.increaseTrackedResource(strings.Split(queuePath4,
configs.DOT), TestApp4, user, usage4)
actualResources := getQTResource(queueTracker)
assert.Equal(t, "map[mem:80000000 vcore:80000]",
actualResources["root"].String(), "wrong resource")
@@ -93,20 +81,14 @@ func TestQTDecreaseTrackedResource(t *testing.T) {
t.Errorf("new resource create returned error or wrong resource:
error %t, res %v", err, usage1)
}
- result :=
queueTracker.increaseTrackedResource(strings.Split(queuePath1, configs.DOT),
TestApp1, user, usage1)
- if !result {
- t.Fatalf("unable to increase tracked resource: queuepath %s,
app %s, res %v", queuePath1, TestApp1, usage1)
- }
+ queueTracker.increaseTrackedResource(strings.Split(queuePath1,
configs.DOT), TestApp1, user, usage1)
assert.Equal(t, 1, len(queueTracker.runningApplications))
usage2, err := resources.NewResourceFromConf(map[string]string{"mem":
"20M", "vcore": "20"})
if err != nil {
t.Errorf("new resource create returned error or wrong resource:
error %t, res %v", err, usage2)
}
- result = queueTracker.increaseTrackedResource(strings.Split(queuePath2,
configs.DOT), TestApp2, user, usage2)
- if !result {
- t.Fatalf("unable to increase tracked resource: queuepath %s,
app %s, res %v", queuePath2, TestApp2, usage2)
- }
+ queueTracker.increaseTrackedResource(strings.Split(queuePath2,
configs.DOT), TestApp2, user, usage2)
actualResources := getQTResource(queueTracker)
assert.Equal(t, 2, len(queueTracker.runningApplications))
@@ -120,16 +102,10 @@ func TestQTDecreaseTrackedResource(t *testing.T) {
t.Errorf("new resource create returned error or wrong resource:
error %t, res %v", err, usage3)
}
- removeQT, decreased :=
queueTracker.decreaseTrackedResource(strings.Split(queuePath1, configs.DOT),
TestApp1, usage3, false)
- if !decreased {
- t.Fatalf("unable to decrease tracked resource: queuepath %s,
app %s, res %v, error %t", queuePath1, TestApp1, usage3, err)
- }
+ removeQT :=
queueTracker.decreaseTrackedResource(strings.Split(queuePath1, configs.DOT),
TestApp1, usage3, false)
assert.Equal(t, removeQT, false, "wrong remove queue tracker value")
- removeQT, decreased =
queueTracker.decreaseTrackedResource(strings.Split(queuePath2, configs.DOT),
TestApp2, usage3, false)
- if !decreased {
- t.Fatalf("unable to decrease tracked resource: queuepath %s,
app %s, res %v, error %t", queuePath2, TestApp2, usage3, err)
- }
+ removeQT =
queueTracker.decreaseTrackedResource(strings.Split(queuePath2, configs.DOT),
TestApp2, usage3, false)
actualResources1 := getQTResource(queueTracker)
assert.Equal(t, removeQT, false, "wrong remove queue tracker value")
@@ -143,10 +119,7 @@ func TestQTDecreaseTrackedResource(t *testing.T) {
if err != nil {
t.Errorf("new resource create returned error or wrong resource:
error %t, res %v", err, usage3)
}
- removeQT, decreased =
queueTracker.decreaseTrackedResource(strings.Split(queuePath1, configs.DOT),
TestApp1, usage4, true)
- if !decreased {
- t.Fatalf("unable to decrease tracked resource: queuepath %s,
app %s, res %v, error %t", queuePath1, TestApp1, usage1, err)
- }
+ removeQT =
queueTracker.decreaseTrackedResource(strings.Split(queuePath1, configs.DOT),
TestApp1, usage4, true)
assert.Equal(t, 1, len(queueTracker.runningApplications))
assert.Equal(t, removeQT, false, "wrong remove queue tracker value")
// Make sure childQueueTracker cleaned
@@ -156,30 +129,21 @@ func TestQTDecreaseTrackedResource(t *testing.T) {
if err != nil {
t.Errorf("new resource create returned error or wrong resource:
error %t, res %v", err, usage5)
}
- removeQT, decreased =
queueTracker.decreaseTrackedResource(strings.Split(queuePath2, configs.DOT),
TestApp2, usage5, true)
- if !decreased {
- t.Fatalf("unable to decrease tracked resource: queuepath %s,
app %s, res %v, error %t", queuePath2, TestApp2, usage2, err)
- }
+ removeQT =
queueTracker.decreaseTrackedResource(strings.Split(queuePath2, configs.DOT),
TestApp2, usage5, true)
assert.Equal(t, 0, len(queueTracker.runningApplications))
// Make sure all childQueueTracker cleaned
assert.Equal(t, len(queueTracker.childQueueTrackers), 0)
assert.Equal(t, removeQT, true, "wrong remove queue tracker value")
// Test parent queueTracker has not zero usage, but child queueTrackers
has all deleted
- result = queueTracker.increaseTrackedResource(strings.Split(queuePath1,
configs.DOT), TestApp1, user, usage1)
- if !result {
- t.Fatalf("unable to increase tracked resource: queuepath %s,
app %s, res %v", queuePath1, TestApp1, usage1)
- }
+ queueTracker.increaseTrackedResource(strings.Split(queuePath1,
configs.DOT), TestApp1, user, usage1)
assert.Equal(t, 1, len(queueTracker.runningApplications))
usage2, err = resources.NewResourceFromConf(map[string]string{"mem":
"20M", "vcore": "20"})
if err != nil {
t.Errorf("new resource create returned error or wrong resource:
error %t, res %v", err, usage2)
}
- result = queueTracker.increaseTrackedResource([]string{"root",
"parent"}, TestApp2, user, usage2)
- if !result {
- t.Fatalf("unable to increase tracked resource: queuepath %s,
app %s, res %v", "root.parent", TestApp2, usage2)
- }
+ queueTracker.increaseTrackedResource([]string{"root", "parent"},
TestApp2, user, usage2)
}
func TestQTQuotaEnforcement(t *testing.T) {
@@ -216,34 +180,13 @@ func TestQTQuotaEnforcement(t *testing.T) {
child2QueueTracker.maxRunningApps = 2
parentQueueTracker.childQueueTrackers["child2"] = child2QueueTracker
- result :=
queueTracker.increaseTrackedResource(strings.Split(queuePath1, configs.DOT),
TestApp1, user, usage1)
- if !result {
- t.Fatalf("unable to increase tracked resource: queuepath %s,
app %s, res %v", queuePath1, TestApp1, usage1)
- }
-
- result = queueTracker.increaseTrackedResource(strings.Split(queuePath2,
configs.DOT), TestApp2, user, usage1)
- if !result {
- t.Fatalf("unable to increase tracked resource: queuepath %s,
app %s, res %v", queuePath2, TestApp2, usage1)
- }
-
- result = queueTracker.increaseTrackedResource(strings.Split(queuePath2,
configs.DOT), TestApp2, user, usage1)
- if !result {
- t.Fatalf("unable to increase tracked resource: queuepath %s,
app %s, res %v", queuePath2, TestApp2, usage1)
- }
-
+ queueTracker.increaseTrackedResource(strings.Split(queuePath1,
configs.DOT), TestApp1, user, usage1)
+ queueTracker.increaseTrackedResource(strings.Split(queuePath2,
configs.DOT), TestApp2, user, usage1)
+ queueTracker.increaseTrackedResource(strings.Split(queuePath2,
configs.DOT), TestApp2, user, usage1)
headroom := queueTracker.headroom(strings.Split(queuePath2,
configs.DOT), user)
assert.Equal(t, headroom.FitInMaxUndef(usage1), false)
-
- result = queueTracker.increaseTrackedResource(strings.Split(queuePath3,
configs.DOT), TestApp3, user, usage1)
- if !result {
- t.Fatalf("unable to increase tracked resource: queuepath %s,
app %s, res %v", queuePath3, TestApp3, usage1)
- }
-
- result = queueTracker.increaseTrackedResource(strings.Split(queuePath4,
configs.DOT), TestApp4, user, usage1)
- if !result {
- t.Fatalf("unable to increase tracked resource: queuepath %s,
app %s, res %v", queuePath4, TestApp4, usage1)
- }
-
+ queueTracker.increaseTrackedResource(strings.Split(queuePath3,
configs.DOT), TestApp3, user, usage1)
+ queueTracker.increaseTrackedResource(strings.Split(queuePath4,
configs.DOT), TestApp4, user, usage1)
headroom = queueTracker.headroom(strings.Split(queuePath4,
configs.DOT), user)
assert.Equal(t, headroom.FitInMaxUndef(usage1), false)
}
diff --git a/pkg/scheduler/ugm/user_tracker.go
b/pkg/scheduler/ugm/user_tracker.go
index 5e395ece..f06581b3 100644
--- a/pkg/scheduler/ugm/user_tracker.go
+++ b/pkg/scheduler/ugm/user_tracker.go
@@ -56,35 +56,22 @@ func newUserTracker(userName string, ugmEvents *ugmEvents)
*UserTracker {
return userTracker
}
-func (ut *UserTracker) increaseTrackedResource(queuePath string, applicationID
string, usage *resources.Resource) bool {
+func (ut *UserTracker) increaseTrackedResource(queuePath string, applicationID
string, usage *resources.Resource) {
ut.Lock()
defer ut.Unlock()
- hierarchy := strings.Split(queuePath, configs.DOT)
ut.events.sendIncResourceUsageForUser(ut.userName, queuePath, usage)
- increased := ut.queueTracker.increaseTrackedResource(hierarchy,
applicationID, user, usage)
- if increased {
- gt := ut.appGroupTrackers[applicationID]
- log.Log(log.SchedUGM).Debug("Increasing resource usage for
group",
- zap.String("group", gt.getName()),
- zap.String("queue path", queuePath),
- zap.String("application", applicationID),
- zap.Stringer("resource", usage))
- increasedGroupUsage := gt.increaseTrackedResource(queuePath,
applicationID, usage, ut.userName)
- if !increasedGroupUsage {
- _, decreased :=
ut.queueTracker.decreaseTrackedResource(hierarchy, applicationID, usage, false)
- if !decreased {
- log.Log(log.SchedUGM).Error("User resource
usage rollback has failed",
- zap.String("queue path", queuePath),
- zap.String("application",
applicationID),
- zap.String("user", ut.userName))
- }
- }
- return increasedGroupUsage
- }
- return increased
+ hierarchy := strings.Split(queuePath, configs.DOT)
+ ut.queueTracker.increaseTrackedResource(hierarchy, applicationID, user,
usage)
+ gt := ut.appGroupTrackers[applicationID]
+ log.Log(log.SchedUGM).Debug("Increasing resource usage for group",
+ zap.String("group", gt.getName()),
+ zap.Strings("queue path", hierarchy),
+ zap.String("application", applicationID),
+ zap.Stringer("resource", usage))
+ gt.increaseTrackedResource(queuePath, applicationID, usage, ut.userName)
}
-func (ut *UserTracker) decreaseTrackedResource(queuePath string, applicationID
string, usage *resources.Resource, removeApp bool) (bool, bool) {
+func (ut *UserTracker) decreaseTrackedResource(queuePath string, applicationID
string, usage *resources.Resource, removeApp bool) bool {
ut.Lock()
defer ut.Unlock()
ut.events.sendDecResourceUsageForUser(ut.userName, queuePath, usage)
diff --git a/pkg/scheduler/ugm/user_tracker_test.go
b/pkg/scheduler/ugm/user_tracker_test.go
index d07c1e21..94598b96 100644
--- a/pkg/scheduler/ugm/user_tracker_test.go
+++ b/pkg/scheduler/ugm/user_tracker_test.go
@@ -65,20 +65,14 @@ func TestIncreaseTrackedResource(t *testing.T) {
if err != nil {
t.Errorf("new resource create returned error or wrong resource:
error %t, res %v", err, usage1)
}
- result := userTracker.increaseTrackedResource(path1, TestApp1, usage1)
- if !result {
- t.Fatalf("unable to increase tracked resource: queuepath %s,
app %s, res %v", path1, TestApp1, usage1)
- }
+ userTracker.increaseTrackedResource(path1, TestApp1, usage1)
groupTracker := newGroupTracker(user.User,
newUGMEvents(mock.NewEventSystemDisabled()))
userTracker.setGroupForApp(TestApp1, groupTracker)
usage2, err := resources.NewResourceFromConf(map[string]string{"mem":
"20M", "vcore": "20"})
if err != nil {
t.Errorf("new resource create returned error or wrong resource:
error %t, res %v", err, usage2)
}
- result = userTracker.increaseTrackedResource(path2, TestApp2, usage2)
- if !result {
- t.Fatalf("unable to increase tracked resource: queuepath %s,
app %s, res %v, error %t", path2, TestApp2, usage2, err)
- }
+ userTracker.increaseTrackedResource(path2, TestApp2, usage2)
assert.Equal(t, 3, len(eventSystem.Events))
assert.Equal(t, si.EventRecord_UG_USER_RESOURCE,
eventSystem.Events[0].EventChangeDetail)
assert.Equal(t, si.EventRecord_ADD,
eventSystem.Events[0].EventChangeType)
@@ -95,24 +89,17 @@ func TestIncreaseTrackedResource(t *testing.T) {
if err != nil {
t.Errorf("new resource create returned error or wrong resource:
error %t, res %v", err, usage3)
}
- result = userTracker.increaseTrackedResource(path3, TestApp3, usage3)
- if !result {
- t.Fatalf("unable to increase tracked resource: queuepath %s,
app %s, res %v, error %t", path3, TestApp3, usage3, err)
- }
+ userTracker.increaseTrackedResource(path3, TestApp3, usage3)
userTracker.setGroupForApp(TestApp3, groupTracker)
usage4, err := resources.NewResourceFromConf(map[string]string{"mem":
"20M", "vcore": "20"})
if err != nil {
t.Errorf("new resource create returned error or wrong resource:
error %t, res %v", err, usage3)
}
- result = userTracker.increaseTrackedResource(path4, TestApp4, usage4)
- if !result {
- t.Fatalf("unable to increase tracked resource: queuepath %s,
app %s, res %v, error %t", path4, TestApp4, usage4, err)
- }
+ userTracker.increaseTrackedResource(path4, TestApp4, usage4)
userTracker.setGroupForApp(TestApp4, groupTracker)
actualResources := getUserResource(userTracker)
-
assert.Equal(t, "map[mem:80000000 vcore:80000]",
actualResources["root"].String(), "wrong resource")
assert.Equal(t, "map[mem:80000000 vcore:80000]",
actualResources["root.parent"].String(), "wrong resource")
assert.Equal(t, "map[mem:40000000 vcore:40000]",
actualResources["root.parent.child1"].String(), "wrong resource")
@@ -136,10 +123,7 @@ func TestDecreaseTrackedResource(t *testing.T) {
if err != nil {
t.Errorf("new resource create returned error or wrong resource:
error %t, res %v", err, usage1)
}
- result := userTracker.increaseTrackedResource(path1, TestApp1, usage1)
- if !result {
- t.Fatalf("unable to increase tracked resource: queuepath %s,
app %s, res %v, error %t", path1, TestApp1, usage1, err)
- }
+ userTracker.increaseTrackedResource(path1, TestApp1, usage1)
groupTracker := newGroupTracker(user.User,
newUGMEvents(mock.NewEventSystemDisabled()))
userTracker.setGroupForApp(TestApp1, groupTracker)
assert.Equal(t, 1, len(userTracker.getTrackedApplications()))
@@ -148,10 +132,7 @@ func TestDecreaseTrackedResource(t *testing.T) {
if err != nil {
t.Errorf("new resource create returned error or wrong resource:
error %t, res %v", err, usage2)
}
- result = userTracker.increaseTrackedResource(path2, TestApp2, usage2)
- if !result {
- t.Fatalf("unable to increase tracked resource: queuepath %s,
app %s, res %v, error %t", path2, TestApp2, usage2, err)
- }
+ userTracker.increaseTrackedResource(path2, TestApp2, usage2)
userTracker.setGroupForApp(TestApp2, groupTracker)
actualResources := getUserResource(userTracker)
@@ -166,18 +147,12 @@ func TestDecreaseTrackedResource(t *testing.T) {
t.Errorf("new resource create returned error or wrong resource:
error %t, res %v", err, usage3)
}
eventSystem.Reset()
- removeQT, decreased := userTracker.decreaseTrackedResource(path1,
TestApp1, usage3, false)
- if !decreased {
- t.Fatalf("unable to decrease tracked resource: queuepath %s,
app %s, res %v, error %t", path1, TestApp1, usage3, err)
- }
+ removeQT := userTracker.decreaseTrackedResource(path1, TestApp1,
usage3, false)
assert.Equal(t, removeQT, false, "wrong remove queue tracker value")
assert.Equal(t, si.EventRecord_UG_USER_RESOURCE,
eventSystem.Events[0].EventChangeDetail)
assert.Equal(t, si.EventRecord_REMOVE,
eventSystem.Events[0].EventChangeType)
- removeQT, decreased = userTracker.decreaseTrackedResource(path2,
TestApp2, usage3, false)
- if !decreased {
- t.Fatalf("unable to decrease tracked resource: queuepath %s,
app %s, res %v, error %t", path1, TestApp2, usage3, err)
- }
+ removeQT = userTracker.decreaseTrackedResource(path2, TestApp2, usage3,
false)
actualResources1 := getUserResource(userTracker)
assert.Equal(t, removeQT, false, "wrong remove queue tracker value")
@@ -192,10 +167,7 @@ func TestDecreaseTrackedResource(t *testing.T) {
}
eventSystem.Reset()
- removeQT, decreased = userTracker.decreaseTrackedResource(path1,
TestApp1, usage4, true)
- if !decreased {
- t.Fatalf("unable to increase tracked resource: queuepath %s,
app %s, res %v, error %t", path1, TestApp1, usage1, err)
- }
+ removeQT = userTracker.decreaseTrackedResource(path1, TestApp1, usage4,
true)
assert.Equal(t, 1, len(userTracker.getTrackedApplications()))
assert.Equal(t, removeQT, false, "wrong remove queue tracker value")
assert.Equal(t, 2, len(eventSystem.Events))
@@ -208,10 +180,7 @@ func TestDecreaseTrackedResource(t *testing.T) {
if err != nil {
t.Errorf("new resource create returned error or wrong resource:
error %t, res %v", err, usage5)
}
- removeQT, decreased = userTracker.decreaseTrackedResource(path2,
TestApp2, usage5, true)
- if !decreased {
- t.Fatalf("unable to increase tracked resource: queuepath %s,
app %s, res %v, error %t", path2, TestApp2, usage2, err)
- }
+ removeQT = userTracker.decreaseTrackedResource(path2, TestApp2, usage5,
true)
assert.Equal(t, 0, len(userTracker.getTrackedApplications()))
assert.Equal(t, removeQT, true, "wrong remove queue tracker value")
}
@@ -228,10 +197,7 @@ func TestSetAndClearMaxLimits(t *testing.T) {
if err != nil {
t.Errorf("new resource create returned error or wrong resource:
error %t, res %v", err, usage1)
}
- result := userTracker.increaseTrackedResource(path1, TestApp1, usage1)
- if !result {
- t.Fatalf("unable to increase tracked resource: queuepath %s,
app %s, res %v, error %t", path1, TestApp1, usage1, err)
- }
+ userTracker.increaseTrackedResource(path1, TestApp1, usage1)
eventSystem.Reset()
userTracker.setLimits(path1, resources.Multiply(usage1, 5), 5, false,
false)
@@ -244,15 +210,8 @@ func TestSetAndClearMaxLimits(t *testing.T) {
assert.Equal(t, si.EventRecord_SET,
eventSystem.Events[1].EventChangeType)
assert.Equal(t, path5, eventSystem.Events[1].ReferenceID)
- result = userTracker.increaseTrackedResource(path1, TestApp1, usage1)
- if !result {
- t.Fatalf("unable to increase tracked resource: queuepath %s,
app %s, res %v", path1, TestApp1, usage1)
- }
-
- result = userTracker.increaseTrackedResource(path1, TestApp2, usage1)
- if !result {
- t.Fatalf("unable to increase tracked resource: queuepath %s,
app %s, res %v", path1, TestApp2, usage1)
- }
+ userTracker.increaseTrackedResource(path1, TestApp1, usage1)
+ userTracker.increaseTrackedResource(path1, TestApp2, usage1)
path1expectedHeadroom :=
resources.NewResourceFromMap(map[string]resources.Quantity{
"mem": 20000000,
"vcore": 20000,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]