This is an automated email from the ASF dual-hosted git repository.
wilfreds pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git
The following commit(s) were added to refs/heads/master by this push:
new dee60d15 [YUNIKORN-2012] group quota is not maintained (#668)
dee60d15 is described below
commit dee60d15ee776d3c9890cacc5538ffa3cbf236fa
Author: Manikandan R <[email protected]>
AuthorDate: Fri Nov 3 12:11:05 2023 +1100
[YUNIKORN-2012] group quota is not maintained (#668)
In certain cases it looks like group quota is not considered during
scheduling. This is caused by incorrectly maintaining the quotas in the
user group manager. The quota was reset while it should not causing the
quota to never be checked.
Further test code added in YUNIKORN-2024i via #669
Closes: #668
Signed-off-by: Wilfred Spiegelenburg <[email protected]>
---
pkg/scheduler/partition_test.go | 1 +
pkg/scheduler/ugm/group_tracker.go | 3 +-
pkg/scheduler/ugm/manager.go | 254 +++++++++++++++++++++++--------------
pkg/scheduler/ugm/manager_test.go | 119 ++++++++++++-----
pkg/scheduler/ugm/queue_tracker.go | 33 ++++-
pkg/scheduler/ugm/user_tracker.go | 3 +-
6 files changed, 275 insertions(+), 138 deletions(-)
diff --git a/pkg/scheduler/partition_test.go b/pkg/scheduler/partition_test.go
index 186fa315..aa867f0f 100644
--- a/pkg/scheduler/partition_test.go
+++ b/pkg/scheduler/partition_test.go
@@ -45,6 +45,7 @@ func setupUGM() {
userManager := ugm.GetUserManager()
userManager.ClearUserTrackers()
userManager.ClearGroupTrackers()
+ userManager.ClearConfigLimits()
}
func setupNode(t *testing.T, nodeID string, partition *PartitionContext,
nodeRes *resources.Resource) *objects.Node {
diff --git a/pkg/scheduler/ugm/group_tracker.go
b/pkg/scheduler/ugm/group_tracker.go
index f6f316f0..62304216 100644
--- a/pkg/scheduler/ugm/group_tracker.go
+++ b/pkg/scheduler/ugm/group_tracker.go
@@ -118,11 +118,10 @@ func (gt *GroupTracker) UnlinkQT(queuePath string) bool {
return gt.queueTracker.UnlinkQT(strings.Split(queuePath, configs.DOT))
}
-// canBeRemoved Does "root" queue has any child queue trackers? Is there any
running applications in "root" qt?
func (gt *GroupTracker) canBeRemoved() bool {
gt.RLock()
defer gt.RUnlock()
- return len(gt.queueTracker.childQueueTrackers) == 0 &&
len(gt.queueTracker.runningApplications) == 0
+ return gt.queueTracker.canBeRemoved()
}
func (gt *GroupTracker) getName() string {
diff --git a/pkg/scheduler/ugm/manager.go b/pkg/scheduler/ugm/manager.go
index fd40aafe..0f431bc7 100644
--- a/pkg/scheduler/ugm/manager.go
+++ b/pkg/scheduler/ugm/manager.go
@@ -39,9 +39,11 @@ var m *Manager
type Manager struct {
userTrackers map[string]*UserTracker
groupTrackers map[string]*GroupTracker
- userWildCardLimitsConfig map[string]*LimitConfig // Hold limits
settings of user '*'
- groupWildCardLimitsConfig map[string]*LimitConfig // Hold limits
settings of group '*'
- configuredGroups map[string][]string // Hold groups for
all configured queue paths.
+ userWildCardLimitsConfig map[string]*LimitConfig // Hold
limits settings of user '*'
+ groupWildCardLimitsConfig map[string]*LimitConfig // Hold
limits settings of group '*'
+ configuredGroups map[string][]string // Hold
groups for all configured queue paths.
+ userLimits map[string]map[string]*LimitConfig // Holds
queue path * user limit config
+ groupLimits map[string]map[string]*LimitConfig // Holds
queue path * group limit config
sync.RWMutex
}
@@ -312,19 +314,29 @@ func (m *Manager) isGroupRemovable(gt *GroupTracker) bool
{
}
func (m *Manager) UpdateConfig(config configs.QueueConfig, queuePath string)
error {
- m.Lock()
- defer m.Unlock()
+ userWildCardLimitsConfig := make(map[string]*LimitConfig)
+ groupWildCardLimitsConfig := make(map[string]*LimitConfig)
+ configuredGroups := make(map[string][]string)
- m.userWildCardLimitsConfig = make(map[string]*LimitConfig)
- m.groupWildCardLimitsConfig = make(map[string]*LimitConfig)
- m.configuredGroups = make(map[string][]string)
- return m.internalProcessConfig(config, queuePath)
+ userLimits := make(map[string]map[string]*LimitConfig) // Holds queue
path * user limit config
+ groupLimits := make(map[string]map[string]*LimitConfig) // Holds queue
path * group limit config
+
+ // as and when parse new configs, store them in temporary maps
+ if err := m.internalProcessConfig(config, queuePath, userLimits,
groupLimits, userWildCardLimitsConfig, groupWildCardLimitsConfig,
configuredGroups); err != nil {
+ return err
+ }
+
+ // compare existing config with new configs stored in above temporary
maps
+ m.clearEarlierSetLimits(userLimits, groupLimits)
+
+ // switch over - replace the existing config with new configs
+ m.replaceLimitConfigs(userLimits, groupLimits,
userWildCardLimitsConfig, groupWildCardLimitsConfig, configuredGroups)
+
+ return nil
}
-func (m *Manager) internalProcessConfig(cur configs.QueueConfig, queuePath
string) error {
- // Holds user and group for which limits have been configured with
specific queue path
- userLimits := make(map[string]bool)
- groupLimits := make(map[string]bool)
+func (m *Manager) internalProcessConfig(cur configs.QueueConfig, queuePath
string, newUserLimits map[string]map[string]*LimitConfig, newGroupLimits
map[string]map[string]*LimitConfig,
+ newUserWildCardLimitsConfig map[string]*LimitConfig,
newGroupWildCardLimitsConfig map[string]*LimitConfig, newConfiguredGroups
map[string][]string) error {
// Traverse limits of specific queue path
for _, limit := range cur.Limits {
var maxResource *resources.Resource
@@ -348,12 +360,16 @@ func (m *Manager) internalProcessConfig(cur
configs.QueueConfig, queuePath strin
zap.Uint64("max application",
limit.MaxApplications),
zap.Any("max resources", limit.MaxResources))
if user == common.Wildcard {
- m.userWildCardLimitsConfig[queuePath] =
limitConfig
+ newUserWildCardLimitsConfig[queuePath] =
limitConfig
continue
}
- if err := m.processUserConfig(user, limitConfig,
queuePath, userLimits); err != nil {
+ if err := m.setUserLimits(user, limitConfig,
queuePath); err != nil {
return err
}
+ if _, ok := newUserLimits[queuePath]; !ok {
+ newUserLimits[queuePath] =
make(map[string]*LimitConfig)
+ }
+ newUserLimits[queuePath][user] = limitConfig
}
for _, group := range limit.Groups {
if group == common.Empty {
@@ -365,24 +381,24 @@ func (m *Manager) internalProcessConfig(cur
configs.QueueConfig, queuePath strin
zap.String("queue path", queuePath),
zap.Uint64("max application",
limit.MaxApplications),
zap.Any("max resources", limit.MaxResources))
- if err := m.processGroupConfig(group, limitConfig,
queuePath, groupLimits); err != nil {
+ if err := m.setGroupLimits(group, limitConfig,
queuePath); err != nil {
return err
}
+ if _, ok := newGroupLimits[queuePath]; !ok {
+ newGroupLimits[queuePath] =
make(map[string]*LimitConfig)
+ }
+ newGroupLimits[queuePath][group] = limitConfig
if group == common.Wildcard {
- m.groupWildCardLimitsConfig[queuePath] =
limitConfig
+ newGroupWildCardLimitsConfig[queuePath] =
limitConfig
} else {
- m.configuredGroups[queuePath] =
append(m.configuredGroups[queuePath], group)
+ newConfiguredGroups[queuePath] =
append(newConfiguredGroups[queuePath], group)
}
}
}
- if err := m.clearEarlierSetLimits(userLimits, groupLimits, queuePath);
err != nil {
- return err
- }
-
if len(cur.Queues) > 0 {
for _, child := range cur.Queues {
childQueuePath := queuePath + configs.DOT + child.Name
- if err := m.internalProcessConfig(child,
childQueuePath); err != nil {
+ if err := m.internalProcessConfig(child,
childQueuePath, newUserLimits, newGroupLimits, newUserWildCardLimitsConfig,
newGroupWildCardLimitsConfig, newConfiguredGroups); err != nil {
return err
}
}
@@ -390,102 +406,132 @@ func (m *Manager) internalProcessConfig(cur
configs.QueueConfig, queuePath strin
return nil
}
-func (m *Manager) processUserConfig(user string, limitConfig *LimitConfig,
queuePath string, userLimits map[string]bool) error {
- if err := m.setUserLimits(user, limitConfig, queuePath); err != nil {
- return err
- }
- userLimits[user] = true
- return nil
-}
+// clearEarlierSetLimits Clear already configured limits of users and groups
for which limits have been configured before but not now
+func (m *Manager) clearEarlierSetLimits(newUserLimits
map[string]map[string]*LimitConfig, newGroupLimits
map[string]map[string]*LimitConfig) {
+ // Clear already configured limits of group for which limits have been
configured before but not now
+ m.clearEarlierSetGroupLimits(newGroupLimits)
-func (m *Manager) processGroupConfig(group string, limitConfig *LimitConfig,
queuePath string, groupLimits map[string]bool) error {
- if err := m.setGroupLimits(group, limitConfig, queuePath); err != nil {
- return err
- }
- groupLimits[group] = true
- return nil
+ // Clear already configured limits of user for which limits have been
configured before but not now
+ m.clearEarlierSetUserLimits(newUserLimits)
}
-// clearEarlierSetLimits Clear already configured limits of users and groups
for which limits have been configured before but not now
-func (m *Manager) clearEarlierSetLimits(userLimits map[string]bool,
groupLimits map[string]bool, queuePath string) error {
- // Clear already configured limits of group for which limits have been
configured before but not now
- for _, gt := range m.groupTrackers {
- appUsersMap := m.clearEarlierSetGroupLimits(gt, queuePath,
groupLimits)
- if len(appUsersMap) > 0 {
- for app, user := range appUsersMap {
- ut := m.userTrackers[user]
- ut.setGroupForApp(app, nil)
+// clearEarlierSetUserLimits Traverse new user config and decide whether
earlier usage needs to be cleared or not
+// by comparing with the existing config. Reset earlier usage only config set
earlier but not now
+func (m *Manager) clearEarlierSetUserLimits(newUserLimits
map[string]map[string]*LimitConfig) {
+ m.RLock()
+ defer m.RUnlock()
+ for queuePath, limitConfig := range m.userLimits {
+ // Is queue path exists?
+ if newUserLimit, ok := newUserLimits[queuePath]; !ok {
+ for u := range limitConfig {
+ if ut, utExists := m.userTrackers[u]; utExists {
+ m.resetUserEarlierUsage(ut, queuePath)
+ }
+ }
+ } else {
+ // Queue path exists. Is user exists?
+ for u := range limitConfig {
+ if _, ulExists := newUserLimit[u]; !ulExists {
+ if ut, utExists := m.userTrackers[u];
utExists {
+ m.resetUserEarlierUsage(ut,
queuePath)
+ }
+ }
}
}
}
-
- // Clear already configured limits of user for which limits have been
configured before but not now
- for _, ut := range m.userTrackers {
- m.clearEarlierSetUserLimits(ut, queuePath, userLimits)
- }
- return nil
}
-func (m *Manager) clearEarlierSetUserLimits(ut *UserTracker, queuePath string,
userLimits map[string]bool) {
+// resetUserEarlierUsage Clear or reset earlier usage only when user already
tracked for the queue path.
+// Reset the max apps and max resources to default, unlink the end leaf queue
of queue path from its immediate parent and
+// eventually remove user tracker object itself from ugm if it can be removed.
+func (m *Manager) resetUserEarlierUsage(ut *UserTracker, queuePath string) {
// Is this user already tracked for the queue path?
if ut.IsQueuePathTrackedCompletely(queuePath) {
- u := ut.userName
- // Is there any limit config set for user in the current
configuration? If not, then clear those old limit settings
- if _, ok := userLimits[u]; !ok {
- log.Log(log.SchedUGM).Debug("Need to clear earlier set
configs for user",
- zap.String("user", u),
- zap.String("queue path", queuePath))
- // Is there any running applications in end queue of
this queue path? If not, then remove the linkage between end queue and its
immediate parent
- if ut.IsUnlinkRequired(queuePath) {
- ut.UnlinkQT(queuePath)
- } else {
- ut.setLimits(queuePath,
resources.NewResource(), 0)
- log.Log(log.SchedUGM).Debug("Cleared earlier
set limit configs for user",
- zap.String("user", u),
- zap.String("queue path", queuePath))
+ log.Log(log.SchedUGM).Debug("Need to clear earlier set configs
for user",
+ zap.String("user", ut.userName),
+ zap.String("queue path", queuePath))
+ ut.setLimits(queuePath, resources.NewResource(), 0)
+ // Is there any running applications in end queue of this queue
path? If not, then remove the linkage between end queue and its immediate parent
+ if ut.IsUnlinkRequired(queuePath) {
+ ut.UnlinkQT(queuePath)
+ }
+ log.Log(log.SchedUGM).Debug("Cleared earlier set limit configs
for user",
+ zap.String("user", ut.userName),
+ zap.String("queue path", queuePath))
+ if ut.canBeRemoved() {
+ delete(m.userTrackers, ut.userName)
+ }
+ }
+}
+
+// clearEarlierSetGroupLimits Traverse new group config and decide whether
earlier usage needs to be cleared or not
+// by comparing with the existing config. Reset earlier usage only config set
earlier but not now
+func (m *Manager) clearEarlierSetGroupLimits(newGroupLimits
map[string]map[string]*LimitConfig) {
+ m.RLock()
+ defer m.RUnlock()
+ for queuePath, limitConfig := range m.groupLimits {
+ // Is queue path exists?
+ if newGroupLimit, ok := newGroupLimits[queuePath]; !ok {
+ for g := range limitConfig {
+ if gt, gtExists := m.groupTrackers[g]; gtExists
{
+ m.resetGroupEarlierUsage(gt, queuePath)
+ }
}
- // Does "root" queue has any child queue trackers? At
some point during this whole traversal, root might
- // not have any child queue trackers. When the
situation comes, remove the linkage between the user and
- // its root queue tracker
- if ut.canBeRemoved() {
- delete(m.userTrackers, ut.userName)
+ } else {
+ // Queue path exists. Is group exists?
+ for g := range limitConfig {
+ if _, glExists := newGroupLimit[g]; !glExists {
+ if gt, gtExists := m.groupTrackers[g];
gtExists {
+ m.resetGroupEarlierUsage(gt,
queuePath)
+ }
+ }
}
}
}
}
-func (m *Manager) clearEarlierSetGroupLimits(gt *GroupTracker, queuePath
string, groupLimits map[string]bool) map[string]string {
- appUsersMap := make(map[string]string)
- // Is this group already tracked for the queue path?
+// resetGroupEarlierUsage Clear or reset earlier usage only when group already
tracked for the queue path.
+// Decrease the group usage and collect the list of applications for which
user app group linkage needs to be broken.
+// Reset the max apps and max resources to default, unlink the end leaf queue
of queue path from its immediate parent and
+// eventually remove group tracker object itself from ugm if it can be removed.
+func (m *Manager) resetGroupEarlierUsage(gt *GroupTracker, queuePath string) {
if gt.IsQueuePathTrackedCompletely(queuePath) {
- g := gt.groupName
- // Is there any limit config set for group in the current
configuration? If not, then clear those old limit settings
- if ok := groupLimits[g]; !ok {
- log.Log(log.SchedUGM).Debug("Need to clear earlier set
configs for group",
- zap.String("group", g),
- zap.String("queue path", queuePath))
- appUsersMap =
gt.decreaseAllTrackedResourceUsage(queuePath)
- // Is there any running applications in end queue of
this queue path? If not, then remove the linkage between end queue and its
immediate parent
- if gt.IsUnlinkRequired(queuePath) {
- gt.UnlinkQT(queuePath)
- } else {
- gt.setLimits(queuePath,
resources.NewResource(), 0)
- log.Log(log.SchedUGM).Debug("Cleared earlier
set limit configs for group",
- zap.String("group", g),
- zap.String("queue path", queuePath))
- }
- // Does "root" queue has any child queue trackers? At
some point during this whole traversal, root might
- // not have any child queue trackers. When the
situation comes, remove the linkage between the group and
- // its root queue tracker
- if gt.canBeRemoved() {
- delete(m.groupTrackers, gt.groupName)
- }
+ log.Log(log.SchedUGM).Debug("Need to clear earlier set configs
for group",
+ zap.String("group", gt.groupName),
+ zap.String("queue path", queuePath))
+ appUsersMap := gt.decreaseAllTrackedResourceUsage(queuePath)
+ for app, u := range appUsersMap {
+ ut := m.userTrackers[u]
+ delete(ut.appGroupTrackers, app)
+ }
+ gt.setLimits(queuePath, resources.NewResource(), 0)
+ // Is there any running applications in end queue of this queue
path? If not, then remove the linkage between end queue and its immediate parent
+ if gt.IsUnlinkRequired(queuePath) {
+ gt.UnlinkQT(queuePath)
+ }
+ log.Log(log.SchedUGM).Debug("Cleared earlier set limit configs
for group",
+ zap.String("group", gt.groupName),
+ zap.String("queue path", queuePath))
+ if gt.canBeRemoved() {
+ delete(m.groupTrackers, gt.groupName)
}
}
- return appUsersMap
+}
+
+func (m *Manager) replaceLimitConfigs(newUserLimits
map[string]map[string]*LimitConfig, newGroupLimits
map[string]map[string]*LimitConfig,
+ newUserWildCardLimitsConfig map[string]*LimitConfig,
newGroupWildCardLimitsConfig map[string]*LimitConfig, newConfiguredGroups
map[string][]string) {
+ m.Lock()
+ defer m.Unlock()
+ m.userLimits = newUserLimits
+ m.groupLimits = newGroupLimits
+ m.userWildCardLimitsConfig = newUserWildCardLimitsConfig
+ m.groupWildCardLimitsConfig = newGroupWildCardLimitsConfig
+ m.configuredGroups = newConfiguredGroups
}
func (m *Manager) setUserLimits(user string, limitConfig *LimitConfig,
queuePath string) error {
+ m.Lock()
+ defer m.Unlock()
log.Log(log.SchedUGM).Debug("Setting user limits",
zap.String("user", user),
zap.String("queue path", queuePath),
@@ -504,6 +550,8 @@ func (m *Manager) setUserLimits(user string, limitConfig
*LimitConfig, queuePath
}
func (m *Manager) setGroupLimits(group string, limitConfig *LimitConfig,
queuePath string) error {
+ m.Lock()
+ defer m.Unlock()
log.Log(log.SchedUGM).Debug("Setting group limits",
zap.String("group", group),
zap.String("queue path", queuePath),
@@ -620,8 +668,20 @@ func (m *Manager) ClearUserTrackers() {
m.userTrackers = make(map[string]*UserTracker)
}
+// ClearGroupTrackers only for tests
func (m *Manager) ClearGroupTrackers() {
m.Lock()
defer m.Unlock()
m.groupTrackers = make(map[string]*GroupTracker)
}
+
+// ClearConfigLimits only for tests
+func (m *Manager) ClearConfigLimits() {
+ m.Lock()
+ defer m.Unlock()
+ m.userWildCardLimitsConfig = make(map[string]*LimitConfig)
+ m.groupWildCardLimitsConfig = make(map[string]*LimitConfig)
+ m.configuredGroups = make(map[string][]string)
+ m.userLimits = make(map[string]map[string]*LimitConfig)
+ m.groupLimits = make(map[string]map[string]*LimitConfig)
+}
diff --git a/pkg/scheduler/ugm/manager_test.go
b/pkg/scheduler/ugm/manager_test.go
index 4a0c8365..7ce5e72b 100644
--- a/pkg/scheduler/ugm/manager_test.go
+++ b/pkg/scheduler/ugm/manager_test.go
@@ -59,6 +59,7 @@ func TestUserManagerOnceInitialization(t *testing.T) {
}
func TestGetGroup(t *testing.T) {
+ setupUGM()
user := security.UserGroup{User: "test", Groups: []string{"test",
"test1"}}
manager := GetUserManager()
@@ -368,15 +369,11 @@ func TestUpdateConfigWithWildCardUsersAndGroups(t
*testing.T) {
// should run as user 'user' setting is map[memory:60 vcores:60] and
total usage of "root.parent" is map[memory:50 vcores:50]
increased := manager.IncreaseTrackedResource(queuePath1, TestApp2,
usage, user)
- if !increased {
- t.Fatalf("unable to increase tracked resource: queuepath %s,
app %s, res %v", queuePath1, TestApp1, user)
- }
+ assert.Equal(t, increased, true)
// should not run as user 'user' setting is map[memory:60 vcores:60]
and total usage of "root.parent" is map[memory:60 vcores:60]
increased = manager.IncreaseTrackedResource(queuePath1, TestApp3,
usage, user)
- if increased {
- t.Fatalf("unable to increase tracked resource: queuepath %s,
app %s, res %v", queuePath1, TestApp1, user)
- }
+ assert.Equal(t, increased, false)
// configure max resource for root.parent to allow one more application
to run through wild card user settings (not through specific user)
// configure limits for user2 only. However, user1 should not be
cleared as it has running applications
@@ -390,29 +387,22 @@ func TestUpdateConfigWithWildCardUsersAndGroups(t
*testing.T) {
// user1 still should be able to run app as wild card user '*' setting
is map[memory:70 vcores:70] for "root.parent" and
// total usage of "root.parent" is map[memory:60 vcores:60]
increased = manager.IncreaseTrackedResource(queuePath1, TestApp2,
usage, user)
- if !increased {
- t.Fatalf("unable to increase tracked resource: queuepath %s,
app %s, res %v", queuePath1, TestApp2, user)
- }
+ assert.Equal(t, increased, true)
// user1 should not be able to run app as wild card user '*' setting is
map[memory:70 vcores:70] for "root.parent"
// and total usage of "root.parent" is map[memory:70 vcores:70]
increased = manager.IncreaseTrackedResource(queuePath1, TestApp3,
usage, user)
- if increased {
- t.Fatalf("unable to increase tracked resource: queuepath %s,
app %s, res %v", queuePath1, TestApp3, user)
- }
+ assert.Equal(t, increased, false)
// configure max resource for group1 * root.parent (map[memory:70
vcores:70]) higher than wild card group * root.parent settings (map[memory:10
vcores:10])
// ensure group's specific settings has been used for enforcement
checks as specific limits always has higher precedence when compared to wild
card group limit settings
+ // group1 quota (map[memory:70 vcores:70]) has been removed from
root.parent.leaf.
+ // so resource usage has been decreased for the same group and that too
for entire queue hierarchy (root->parent->leaf)
+ // since group1 quota has been configured for root.parent, resource
usage would be increased from
+ // the place where it has been left.
conf = createUpdateConfigWithWildCardUsersAndGroups(user1.User,
user1.Groups[0], "*", "*", "10", "10")
assert.NilError(t, manager.UpdateConfig(conf.Queues[0], "root"))
- // since group1 quota (map[memory:70 vcores:70]) has reached before,
further group increase is not allowed.
- // Hence, user's tracked usage would be reverted
- increased = manager.IncreaseTrackedResource(queuePath1, TestApp1,
usage, user1)
- if increased {
- t.Fatalf("unable to increase tracked resource: queuepath %s,
app %s, res %v", queuePath1, TestApp1, user1)
- }
-
// configure max resource for user2 * root.parent (map[memory:70
vcores:70]) higher than wild card user * root.parent settings (map[memory:10
vcores:10])
// ensure user's specific settings has been used for enforcement checks
as specific limits always has higher precedence when compared to wild card user
limit settings
conf = createUpdateConfigWithWildCardUsersAndGroups(user1.User, "",
"*", "", "10", "10")
@@ -421,16 +411,12 @@ func TestUpdateConfigWithWildCardUsersAndGroups(t
*testing.T) {
// can be allowed to run upto resource usage map[memory:70 vcores:70]
for i := 1; i <= 7; i++ {
increased = manager.IncreaseTrackedResource(queuePath1,
TestApp1, usage, user1)
- if !increased {
- t.Fatalf("unable to increase tracked resource:
queuepath %s, app %s, res %v", queuePath1, TestApp1, user1)
- }
+ assert.Equal(t, increased, true)
}
// user2 should not be able to run app as user2 max limit is
map[memory:70 vcores:70] and usage so far is map[memory:70 vcores:70]
increased = manager.IncreaseTrackedResource(queuePath1, TestApp1,
usage, user1)
- if increased {
- t.Fatalf("unable to increase tracked resource: queuepath %s,
app %s, res %v", queuePath1, TestApp1, user1)
- }
+ assert.Equal(t, increased, false)
user3 := security.UserGroup{User: "user3", Groups: []string{"group3"}}
conf = createUpdateConfigWithWildCardUsersAndGroups(user1.User,
user1.Groups[0], "", "*", "10", "10")
@@ -438,9 +424,7 @@ func TestUpdateConfigWithWildCardUsersAndGroups(t
*testing.T) {
// user3 should be able to run app as group3 uses wild card group limit
settings map[memory:10 vcores:10]
increased = manager.IncreaseTrackedResource(queuePath1, TestApp1,
usage, user3)
- if !increased {
- t.Fatalf("unable to increase tracked resource: queuepath %s,
app %s, res %v", queuePath1, TestApp1, user)
- }
+ assert.Equal(t, increased, true)
// user4 (though belongs to different group, group4) should not be able
to run app as group4 also
// uses wild card group limit settings map[memory:10 vcores:10]
@@ -453,16 +437,12 @@ func TestUpdateConfigWithWildCardUsersAndGroups(t
*testing.T) {
// Since app is TestApp1, gt of "*" would be used as it is already
mapped. group4 won't be used
increased = manager.IncreaseTrackedResource(queuePath1, TestApp1,
usage, user4)
- if increased {
- t.Fatalf("unable to increase tracked resource: queuepath %s,
app %s, res %v", queuePath1, TestApp1, user1)
- }
+ assert.Equal(t, increased, false)
// Now group4 would be used as user4 is running TestApp2 for the first
time. So can be allowed to run upto resource usage map[memory:70 vcores:70]
for i := 1; i <= 7; i++ {
increased = manager.IncreaseTrackedResource(queuePath1,
TestApp2, usage, user4)
- if !increased {
- t.Fatalf("unable to increase tracked resource:
queuepath %s, app %s, res %v", queuePath1, TestApp1, user1)
- }
+ assert.Equal(t, increased, true)
}
// user4 should not be able to run app as user4 max limit is
map[memory:70 vcores:70] and usage so far is map[memory:70 vcores:70]
@@ -540,6 +520,28 @@ func TestUpdateConfigClearEarlierSetLimits(t *testing.T) {
assert.Equal(t, len(manager.groupWildCardLimitsConfig), 0)
}
+func TestUpdateConfigClearEarlierSetGroupLimits(t *testing.T) {
+ setupUGM()
+ user := security.UserGroup{User: "user1", Groups: []string{"group1"}}
+ conf := createConfigWithGroupOnly(user.Groups[0], 50, 5)
+
+ manager := GetUserManager()
+ assert.NilError(t, manager.UpdateConfig(conf.Queues[0], "root"))
+
+ usage, err := resources.NewResourceFromConf(map[string]string{"memory":
"25", "vcores": "25"})
+ if err != nil {
+ t.Errorf("new resource create returned error or wrong resource:
error %t, res %v", err, usage)
+ }
+ cQueue := "root.parent.leaf"
+ for i := 1; i <= 2; i++ {
+ increased := manager.IncreaseTrackedResource(cQueue, TestApp1,
usage, user)
+ assert.Equal(t, increased, true, "unable to increase tracked
resource: queuepath "+cQueue+", app "+TestApp1+", res "+usage.String())
+ }
+ assert.NilError(t, manager.UpdateConfig(conf.Queues[0], "root"))
+ increased := manager.IncreaseTrackedResource(cQueue, TestApp1, usage,
user)
+ assert.Equal(t, increased, false, "unable to increase tracked resource:
queuepath "+cQueue+", app "+TestApp1+", res "+usage.String())
+}
+
func TestSetMaxLimitsForRemovedUsers(t *testing.T) {
setupUGM()
// Queue setup:
@@ -653,8 +655,12 @@ func TestUserGroupHeadroom(t *testing.T) {
t.Errorf("new resource create returned error or wrong resource:
error %t, res %v", err, usage)
}
// ensure group headroom returned when there is no limit settings
configured for user
+ // group1 quota (map[memory:70 vcores:70]) has been removed from
root.parent.leaf.
+ // so resource usage has been decreased for the same group and that too
for entire queue hierarchy (root->parent->leaf)
+ // since group1 quota has been configured for root.parent, resource
usage would be increased from
+ // the place where it has been left. so there is no usage after the
recent config change, entire group's quota would be returned as headroom.
headroom = manager.Headroom("root.parent", TestApp1, user)
- assert.Equal(t, resources.Equals(headroom, resources.Sub(usage1,
usage)), true)
+ assert.Equal(t, resources.Equals(headroom, usage1), true)
}
func TestDecreaseTrackedResourceForGroupTracker(t *testing.T) {
@@ -1304,6 +1310,48 @@ func createConfigWithDifferentGroups(user string, group
string, resourceKey stri
return conf
}
+func createConfigWithGroupOnly(group string, mem int, maxApps uint64)
configs.PartitionConfig {
+ conf := configs.PartitionConfig{
+ Name: "test",
+ Queues: []configs.QueueConfig{
+ {
+ Name: "root",
+ Parent: true,
+ SubmitACL: "*",
+ Queues: []configs.QueueConfig{
+ {
+ Name: "parent",
+ Parent: true,
+ SubmitACL: "*",
+ Queues: []configs.QueueConfig{
+ {
+ Name:
"leaf",
+ Parent:
false,
+ SubmitACL: "*",
+ Queues: nil,
+ },
+ },
+ Limits: []configs.Limit{
+ {
+ Limit: "parent
queue limit",
+ Groups:
[]string{
+ group,
+ },
+ MaxResources:
map[string]string{
+
"memory": strconv.Itoa(mem),
+
"vcores": strconv.Itoa(mem),
+ },
+
MaxApplications: maxApps,
+ },
+ },
+ },
+ },
+ },
+ },
+ }
+ return conf
+}
+
func createConfigWithoutLimits() configs.PartitionConfig {
conf := configs.PartitionConfig{
Name: "test",
@@ -1330,6 +1378,7 @@ func setupUGM() {
manager := GetUserManager()
manager.ClearUserTrackers()
manager.ClearGroupTrackers()
+ manager.ClearConfigLimits()
}
func assertUGM(t *testing.T, userGroup security.UserGroup, expected
*resources.Resource, usersCount int) {
diff --git a/pkg/scheduler/ugm/queue_tracker.go
b/pkg/scheduler/ugm/queue_tracker.go
index deef4a74..39124c25 100644
--- a/pkg/scheduler/ugm/queue_tracker.go
+++ b/pkg/scheduler/ugm/queue_tracker.go
@@ -250,6 +250,7 @@ func (qt *QueueTracker) headroom(hierarchy []string)
*resources.Resource {
}
childHeadroom =
qt.childQueueTrackers[childName].headroom(hierarchy[1:])
}
+
// arrived at the leaf or on the way out: check against current max if
set
if !resources.Equals(resources.NewResource(), qt.maxResources) {
headroom = qt.maxResources.Clone()
@@ -349,6 +350,10 @@ func (qt *QueueTracker) UnlinkQT(hierarchy []string) bool {
if qt.childQueueTrackers[childName] != nil {
if
qt.childQueueTrackers[childName].UnlinkQT(hierarchy[1:]) {
delete(qt.childQueueTrackers, childName)
+ // returning false, so that it comes out when
end queue detach itself from its immediate parent.
+ // i.e., once leaf detached from root.parent
for root.parent.leaf queue path.
+ // otherwise, detachment continues all the way
upto the root, even parent from root. which is not needed.
+ return false
}
}
} else if len(hierarchy) <= 1 {
@@ -389,12 +394,10 @@ func (qt *QueueTracker)
decreaseTrackedResourceUsageDownwards(hierarchy []string
}
}
}
-
if len(qt.runningApplications) > 0 && qt.resourceUsage !=
resources.NewResource() {
qt.resourceUsage = resources.NewResource()
qt.runningApplications = make(map[string]bool)
}
-
return removedApplications
}
@@ -456,3 +459,29 @@ func (qt *QueueTracker) canRunApp(hierarchy []string,
applicationID string, trac
}
return true
}
+
+// canBeRemoved Start from root and reach all levels of queue hierarchy to
confirm whether corresponding queue tracker
+// object can be removed from ugm or not. Based on running applications,
resource usage, child queue trackers, max running apps, max resources etc
+// it decides the removal. It returns false the moment it sees any unexpected
values for any queue in any levels.
+func (qt *QueueTracker) canBeRemoved() bool {
+ for _, childQT := range qt.childQueueTrackers {
+ // quick check to avoid further traversal
+ if childQT.canBeRemovedInternal() {
+ if !childQT.canBeRemoved() {
+ return false
+ }
+ } else {
+ return false
+ }
+ }
+ // reached leaf queues, no more to traverse
+ return qt.canBeRemovedInternal()
+}
+
+func (qt *QueueTracker) canBeRemovedInternal() bool {
+ if len(qt.runningApplications) == 0 &&
resources.Equals(resources.NewResource(), qt.resourceUsage) &&
len(qt.childQueueTrackers) == 0 &&
+ qt.maxRunningApps == 0 &&
resources.Equals(resources.NewResource(), qt.maxResources) {
+ return true
+ }
+ return false
+}
diff --git a/pkg/scheduler/ugm/user_tracker.go
b/pkg/scheduler/ugm/user_tracker.go
index 5639253d..5e842d58 100644
--- a/pkg/scheduler/ugm/user_tracker.go
+++ b/pkg/scheduler/ugm/user_tracker.go
@@ -164,11 +164,10 @@ func (ut *UserTracker) UnlinkQT(queuePath string) bool {
return ut.queueTracker.UnlinkQT(strings.Split(queuePath, configs.DOT))
}
-// canBeRemoved Does "root" queue has any child queue trackers? Is there any
running applications in "root" qt?
func (ut *UserTracker) canBeRemoved() bool {
ut.RLock()
defer ut.RUnlock()
- return len(ut.queueTracker.childQueueTrackers) == 0 &&
len(ut.queueTracker.runningApplications) == 0
+ return ut.queueTracker.canBeRemoved()
}
func (ut *UserTracker) canRunApp(queuePath, applicationID string) bool {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]