This is an automated email from the ASF dual-hosted git repository.
wilfreds pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git
The following commit(s) were added to refs/heads/master by this push:
new fa43406d [YUNIKORN-1858] Handle group limit config changes (#600)
fa43406d is described below
commit fa43406dd9e7664749b6c5283e1460b5ae1ca9e3
Author: Manikandan R <[email protected]>
AuthorDate: Mon Aug 21 17:20:58 2023 +1000
[YUNIKORN-1858] Handle group limit config changes (#600)
handle config changes for groups and clean up linked usage tracking
objects when the config changes.
Closes: #600
Signed-off-by: Wilfred Spiegelenburg <[email protected]>
---
pkg/scheduler/objects/application.go | 1 +
pkg/scheduler/partition_test.go | 20 ++++++++++++
pkg/scheduler/ugm/group_tracker.go | 36 +++++++++++++--------
pkg/scheduler/ugm/group_tracker_test.go | 18 +++++------
pkg/scheduler/ugm/manager.go | 51 +++++++++---------------------
pkg/scheduler/ugm/queue_tracker.go | 56 +++++++++++++++++++++++++++------
pkg/scheduler/ugm/user_tracker.go | 2 +-
pkg/scheduler/utilities_test.go | 4 +++
8 files changed, 118 insertions(+), 70 deletions(-)
diff --git a/pkg/scheduler/objects/application.go
b/pkg/scheduler/objects/application.go
index bdbf72fd..1f47c7e1 100644
--- a/pkg/scheduler/objects/application.go
+++ b/pkg/scheduler/objects/application.go
@@ -681,6 +681,7 @@ func (sa *Application) AddAllocationAsk(ask *AllocationAsk)
error {
log.Log(log.SchedApplication).Info("ask added successfully to
application",
zap.String("appID", sa.ApplicationID),
+ zap.String("appID", sa.user.User),
zap.String("ask", ask.GetAllocationKey()),
zap.Bool("placeholder", ask.IsPlaceholder()),
zap.Stringer("pendingDelta", delta))
diff --git a/pkg/scheduler/partition_test.go b/pkg/scheduler/partition_test.go
index d1f55669..9f4c8125 100644
--- a/pkg/scheduler/partition_test.go
+++ b/pkg/scheduler/partition_test.go
@@ -3527,6 +3527,26 @@ func TestUserHeadroom(t *testing.T) {
if alloc != nil {
t.Fatal("allocation should not happen on other nodes as well")
}
+ partition.removeApplication("app-5")
+
+ app6 := newApplicationWithUser("app-6", "default",
"root.parent.sub-leaf", security.UserGroup{
+ User: "testuser1",
+ Groups: []string{"testgroup1"},
+ })
+ res, err = resources.NewResourceFromConf(map[string]string{"memory":
"3", "vcores": "3"})
+ assert.NilError(t, err, "failed to create resource")
+
+ err = partition.AddApplication(app6)
+ assert.NilError(t, err, "failed to add app-6 to partition")
+ err = app6.AddAllocationAsk(newAllocationAsk(allocID, "app-6", res))
+ assert.NilError(t, err, "failed to add ask alloc-1 to app-6")
+
+ // app 6 would be allocated as headroom is nil because no limits
configured for 'testuser1' user an
+ alloc = partition.tryAllocate()
+ if alloc == nil {
+ t.Fatal("allocation did not return any allocation")
+ }
+ assert.Equal(t, alloc.GetResult(), objects.Allocated, "result is not
the expected allocated")
}
func TestPlaceholderAllocationTracking(t *testing.T) {
diff --git a/pkg/scheduler/ugm/group_tracker.go
b/pkg/scheduler/ugm/group_tracker.go
index ff410dc2..8fde8618 100644
--- a/pkg/scheduler/ugm/group_tracker.go
+++ b/pkg/scheduler/ugm/group_tracker.go
@@ -27,9 +27,9 @@ import (
)
type GroupTracker struct {
- groupName string // Name of the group for which usage is
being tracked upon
- applications map[string]bool // Hold applications currently run by all
users belong to this group
- queueTracker *QueueTracker // Holds the actual resource usage of
queue path where application run
+ groupName string // Name of the group for which usage is
being tracked upon
+ applications map[string]string // Hold applications currently run by
all users belong to this group
+ queueTracker *QueueTracker // Holds the actual resource usage of
queue path where application run
sync.RWMutex
}
@@ -38,19 +38,19 @@ func newGroupTracker(group string) *GroupTracker {
queueTracker := newRootQueueTracker()
groupTracker := &GroupTracker{
groupName: group,
- applications: make(map[string]bool),
+ applications: make(map[string]string),
queueTracker: queueTracker,
}
return groupTracker
}
-func (gt *GroupTracker) increaseTrackedResource(queuePath, applicationID
string, usage *resources.Resource) bool {
+func (gt *GroupTracker) increaseTrackedResource(queuePath, applicationID
string, usage *resources.Resource, user string) bool {
if gt == nil {
return true
}
gt.Lock()
defer gt.Unlock()
- gt.applications[applicationID] = true
+ gt.applications[applicationID] = user
return gt.queueTracker.increaseTrackedResource(queuePath,
applicationID, group, usage)
}
@@ -66,7 +66,7 @@ func (gt *GroupTracker) decreaseTrackedResource(queuePath,
applicationID string,
return gt.queueTracker.decreaseTrackedResource(queuePath,
applicationID, usage, removeApp)
}
-func (gt *GroupTracker) getTrackedApplications() map[string]bool {
+func (gt *GroupTracker) getTrackedApplications() map[string]string {
gt.RLock()
defer gt.RUnlock()
return gt.applications
@@ -123,15 +123,25 @@ func (gt *GroupTracker) canBeRemoved() bool {
return len(gt.queueTracker.childQueueTrackers) == 0 &&
len(gt.queueTracker.runningApplications) == 0
}
-func (gt *GroupTracker) removeApp(applicationID string) {
- gt.Lock()
- defer gt.Unlock()
- delete(gt.applications, applicationID)
-}
-
func (gt *GroupTracker) getName() string {
if gt == nil {
return common.Empty
}
return gt.groupName
}
+
+func (gt *GroupTracker) decreaseAllTrackedResourceUsage(queuePath string)
map[string]string {
+ if gt == nil {
+ return nil
+ }
+ gt.Lock()
+ defer gt.Unlock()
+ applications :=
gt.queueTracker.decreaseTrackedResourceUsageDownwards(queuePath)
+ removedApplications := make(map[string]string)
+ for app := range applications {
+ if u, ok := gt.applications[app]; ok {
+ removedApplications[app] = u
+ }
+ }
+ return removedApplications
+}
diff --git a/pkg/scheduler/ugm/group_tracker_test.go
b/pkg/scheduler/ugm/group_tracker_test.go
index 7d0ce280..5dbbe544 100644
--- a/pkg/scheduler/ugm/group_tracker_test.go
+++ b/pkg/scheduler/ugm/group_tracker_test.go
@@ -40,7 +40,7 @@ func TestGTIncreaseTrackedResource(t *testing.T) {
if err != nil {
t.Errorf("new resource create returned error or wrong resource:
error %t, res %v", err, usage1)
}
- result := groupTracker.increaseTrackedResource(queuePath1, TestApp1,
usage1)
+ result := groupTracker.increaseTrackedResource(queuePath1, TestApp1,
usage1, user.User)
if !result {
t.Fatalf("unable to increase tracked resource: queuepath %s,
app %s, res %v", queuePath1, TestApp1, usage1)
}
@@ -49,7 +49,7 @@ func TestGTIncreaseTrackedResource(t *testing.T) {
if err != nil {
t.Errorf("new resource create returned error or wrong resource:
error %t, res %v", err, usage2)
}
- result = groupTracker.increaseTrackedResource(queuePath2, TestApp2,
usage2)
+ result = groupTracker.increaseTrackedResource(queuePath2, TestApp2,
usage2, user.User)
if !result {
t.Fatalf("unable to increase tracked resource: queuepath %s,
app %s, res %v", queuePath2, TestApp2, usage2)
}
@@ -58,7 +58,7 @@ func TestGTIncreaseTrackedResource(t *testing.T) {
if err != nil {
t.Errorf("new resource create returned error or wrong resource:
error %t, res %v", err, usage3)
}
- result = groupTracker.increaseTrackedResource(queuePath3, TestApp3,
usage3)
+ result = groupTracker.increaseTrackedResource(queuePath3, TestApp3,
usage3, user.User)
if !result {
t.Fatalf("unable to increase tracked resource: queuepath %s,
app %s, res %v", queuePath3, TestApp3, usage3)
}
@@ -67,7 +67,7 @@ func TestGTIncreaseTrackedResource(t *testing.T) {
if err != nil {
t.Errorf("new resource create returned error or wrong resource:
error %t, res %v", err, usage3)
}
- result = groupTracker.increaseTrackedResource(queuePath4, TestApp4,
usage4)
+ result = groupTracker.increaseTrackedResource(queuePath4, TestApp4,
usage4, user.User)
if !result {
t.Fatalf("unable to increase tracked resource: queuepath %s,
app %s, res %v", queuePath4, TestApp4, usage4)
}
@@ -92,7 +92,7 @@ func TestGTDecreaseTrackedResource(t *testing.T) {
if err != nil {
t.Errorf("new resource create returned error or wrong resource:
error %t, res %v", err, usage1)
}
- result := groupTracker.increaseTrackedResource(queuePath1, TestApp1,
usage1)
+ result := groupTracker.increaseTrackedResource(queuePath1, TestApp1,
usage1, user.User)
if !result {
t.Fatalf("unable to increase tracked resource: queuepath %s,
app %s, res %v", queuePath1, TestApp1, usage1)
}
@@ -102,7 +102,7 @@ func TestGTDecreaseTrackedResource(t *testing.T) {
if err != nil {
t.Errorf("new resource create returned error or wrong resource:
error %t, res %v", err, usage2)
}
- result = groupTracker.increaseTrackedResource(queuePath2, TestApp2,
usage2)
+ result = groupTracker.increaseTrackedResource(queuePath2, TestApp2,
usage2, user.User)
if !result {
t.Fatalf("unable to increase tracked resource: queuepath %s,
app %s, res %v", queuePath2, TestApp2, usage2)
}
@@ -172,7 +172,7 @@ func TestGTSetMaxLimits(t *testing.T) {
t.Errorf("new resource create returned error or wrong resource:
error %t, res %v", err, usage1)
}
- result := groupTracker.increaseTrackedResource(queuePath1, TestApp1,
usage1)
+ result := groupTracker.increaseTrackedResource(queuePath1, TestApp1,
usage1, user.User)
if !result {
t.Fatalf("unable to increase tracked resource: queuepath %s,
app %s, res %v", queuePath1, TestApp1, usage1)
}
@@ -180,11 +180,11 @@ func TestGTSetMaxLimits(t *testing.T) {
groupTracker.setLimits(queuePath1, resources.Multiply(usage1, 5), 5)
groupTracker.setLimits("root.parent", resources.Multiply(usage1, 10),
10)
- result = groupTracker.increaseTrackedResource(queuePath1, TestApp2,
usage1)
+ result = groupTracker.increaseTrackedResource(queuePath1, TestApp2,
usage1, user.User)
if !result {
t.Fatalf("unable to increase tracked resource: queuepath %s,
app %s, res %v", queuePath1, TestApp2, usage1)
}
- result = groupTracker.increaseTrackedResource(queuePath1, TestApp3,
usage1)
+ result = groupTracker.increaseTrackedResource(queuePath1, TestApp3,
usage1, user.User)
if !result {
t.Fatalf("unable to increase tracked resource: queuepath %s,
app %s, res %v", queuePath1, TestApp2, usage1)
}
diff --git a/pkg/scheduler/ugm/manager.go b/pkg/scheduler/ugm/manager.go
index d71ad032..76141573 100644
--- a/pkg/scheduler/ugm/manager.go
+++ b/pkg/scheduler/ugm/manager.go
@@ -408,46 +408,20 @@ func (m *Manager) processGroupConfig(group string,
limitConfig *LimitConfig, que
// clearEarlierSetLimits Clear already configured limits of users and groups
for which limits have been configured before but not now
func (m *Manager) clearEarlierSetLimits(userLimits map[string]bool,
groupLimits map[string]bool, queuePath string) error {
- // Clear already configured limits of user for which limits have been
configured before but not now
- for u, ut := range m.userTrackers {
- // Is this user already tracked for the queue path?
- if ut.IsQueuePathTrackedCompletely(queuePath) {
- // Traverse all the group trackers linked to user
through different applications and remove the linkage
- for appID, gt := range ut.appGroupTrackers {
- if gt != nil {
- g := gt.groupName
- // Is there any limit config set for
group in the current configuration? If not, then remove the linkage by setting
it to nil
- if ok := groupLimits[g]; !ok {
-
log.Log(log.SchedUGM).Debug("Removed the linkage between user and group through
applications",
- zap.String("user", u),
- zap.String("group",
gt.groupName),
- zap.String("application
id", appID),
- zap.String("queue
path", queuePath))
- // removing the linkage only
happens here by setting it to nil and deleting app id
- // but group resource usage so
far remains as it is because we don't have app id wise resource usage with in
group as of now.
- // YUNIKORN-1858 handles the
group resource usage properly
- // In case of only one (last)
application, group tracker would be removed from the manager.
- ut.setGroupForApp(appID, nil)
- gt.removeApp(appID)
- if
len(gt.getTrackedApplications()) == 0 {
-
log.Log(log.SchedUGM).Debug("Is this app the only running application in
group?",
-
zap.String("user", u),
-
zap.String("group", gt.groupName),
- zap.Int("no. of
applications", len(gt.getTrackedApplications())),
-
zap.String("application id", appID),
-
zap.String("queue path", queuePath))
- delete(m.groupTrackers,
g)
- }
- }
- }
+ // Clear already configured limits of group for which limits have been
configured before but not now
+ for _, gt := range m.groupTrackers {
+ appUsersMap := m.clearEarlierSetGroupLimits(gt, queuePath,
groupLimits)
+ if len(appUsersMap) > 0 {
+ for app, user := range appUsersMap {
+ ut := m.userTrackers[user]
+ ut.setGroupForApp(app, nil)
}
}
- m.clearEarlierSetUserLimits(ut, queuePath, userLimits)
}
- // Clear already configured limits of group for which limits have been
configured before but not now
- for _, gt := range m.groupTrackers {
- m.clearEarlierSetGroupLimits(gt, queuePath, groupLimits)
+ // Clear already configured limits of user for which limits have been
configured before but not now
+ for _, ut := range m.userTrackers {
+ m.clearEarlierSetUserLimits(ut, queuePath, userLimits)
}
return nil
}
@@ -480,7 +454,8 @@ func (m *Manager) clearEarlierSetUserLimits(ut
*UserTracker, queuePath string, u
}
}
-func (m *Manager) clearEarlierSetGroupLimits(gt *GroupTracker, queuePath
string, groupLimits map[string]bool) {
+func (m *Manager) clearEarlierSetGroupLimits(gt *GroupTracker, queuePath
string, groupLimits map[string]bool) map[string]string {
+ appUsersMap := make(map[string]string)
// Is this group already tracked for the queue path?
if gt.IsQueuePathTrackedCompletely(queuePath) {
g := gt.groupName
@@ -489,6 +464,7 @@ func (m *Manager) clearEarlierSetGroupLimits(gt
*GroupTracker, queuePath string,
log.Log(log.SchedUGM).Debug("Need to clear earlier set
configs for group",
zap.String("group", g),
zap.String("queue path", queuePath))
+ appUsersMap =
gt.decreaseAllTrackedResourceUsage(queuePath)
// Is there any running applications in end queue of
this queue path? If not, then remove the linkage between end queue and its
immediate parent
if gt.IsUnlinkRequired(queuePath) {
gt.UnlinkQT(queuePath)
@@ -506,6 +482,7 @@ func (m *Manager) clearEarlierSetGroupLimits(gt
*GroupTracker, queuePath string,
}
}
}
+ return appUsersMap
}
func (m *Manager) setUserLimits(user string, limitConfig *LimitConfig,
queuePath string) error {
diff --git a/pkg/scheduler/ugm/queue_tracker.go
b/pkg/scheduler/ugm/queue_tracker.go
index c793ffc0..edcb0c07 100644
--- a/pkg/scheduler/ugm/queue_tracker.go
+++ b/pkg/scheduler/ugm/queue_tracker.go
@@ -88,7 +88,7 @@ func (qt *QueueTracker) increaseTrackedResource(queuePath
string, applicationID
zap.String("queue path", queuePath),
zap.Bool("existing app", existingApp),
zap.Uint64("max running apps", qt.maxRunningApps),
- zap.String("max resources", qt.maxResources.String()))
+ zap.Stringer("max resources", qt.maxResources))
if (!existingApp && len(qt.runningApplications)+1 >
int(qt.maxRunningApps)) ||
resources.StrictlyGreaterThan(finalResourceUsage,
qt.maxResources) {
log.Log(log.SchedUGM).Warn("Unable to increase resource
usage as allowing new application to run would exceed either configured max
applications or max resources limit of specific user/group",
@@ -97,8 +97,8 @@ func (qt *QueueTracker) increaseTrackedResource(queuePath
string, applicationID
zap.Bool("existing app", existingApp),
zap.Int("current running applications",
len(qt.runningApplications)),
zap.Uint64("max running applications",
qt.maxRunningApps),
- zap.String("current resource usage",
qt.resourceUsage.String()),
- zap.String("max resource usage",
qt.maxResources.String()))
+ zap.Stringer("current resource usage",
qt.resourceUsage),
+ zap.Stringer("max resource usage",
qt.maxResources))
return false
}
}
@@ -118,7 +118,7 @@ func (qt *QueueTracker) increaseTrackedResource(queuePath
string, applicationID
zap.String("queue path", queuePath),
zap.Bool("existing app", existingApp),
zap.Uint64("wild card max running apps",
config.maxApplications),
- zap.String("wild card max resources",
config.maxResources.String()),
+ zap.Stringer("wild card max resources",
config.maxResources),
zap.Bool("wild card quota exceeded",
wildCardQuotaExceeded))
wildCardQuotaExceeded = (config.maxApplications != 0 &&
!existingApp && len(qt.runningApplications)+1 > int(config.maxApplications)) ||
(!resources.Equals(resources.NewResource(),
config.maxResources) && resources.StrictlyGreaterThan(finalResourceUsage,
config.maxResources))
@@ -129,8 +129,8 @@ func (qt *QueueTracker) increaseTrackedResource(queuePath
string, applicationID
zap.Bool("existing app", existingApp),
zap.Int("current running applications",
len(qt.runningApplications)),
zap.Uint64("max running applications",
config.maxApplications),
- zap.String("current resource usage",
qt.resourceUsage.String()),
- zap.String("max resource usage",
config.maxResources.String()))
+ zap.Stringer("current resource usage",
qt.resourceUsage),
+ zap.Stringer("max resource usage",
config.maxResources))
return false
}
}
@@ -158,7 +158,7 @@ func (qt *QueueTracker) increaseTrackedResource(queuePath
string, applicationID
zap.Bool("existing app", existingApp),
zap.Stringer("resource", usage),
zap.Uint64("max running applications", qt.maxRunningApps),
- zap.String("max resource usage", qt.maxResources.String()),
+ zap.Stringer("max resource usage", qt.maxResources),
zap.Stringer("total resource after increasing",
qt.resourceUsage),
zap.Int("total applications after increasing",
len(qt.runningApplications)))
return true
@@ -239,7 +239,7 @@ func (qt *QueueTracker) setLimit(queuePath string,
maxResource *resources.Resour
log.Log(log.SchedUGM).Debug("Setting limits",
zap.String("queue path", queuePath),
zap.Uint64("max applications", maxApps),
- zap.String("max resources", maxResource.String()))
+ zap.Stringer("max resources", maxResource))
childQueueTracker := qt.getChildQueueTracker(queuePath)
childQueueTracker.maxRunningApps = maxApps
childQueueTracker.maxResources = maxResource
@@ -268,8 +268,8 @@ func (qt *QueueTracker) headroom(queuePath string)
*resources.Resource {
log.Log(log.SchedUGM).Debug("Calculated headroom",
zap.String("queue path", queuePath),
zap.String("queue", qt.queueName),
- zap.String("max resource", qt.maxResources.String()),
- zap.String("headroom", headroom.String()))
+ zap.Stringer("max resource", qt.maxResources),
+ zap.Stringer("headroom", headroom))
return headroom
}
return nil
@@ -364,3 +364,39 @@ func (qt *QueueTracker) UnlinkQT(queuePath string) bool {
}
return false
}
+
+// decreaseTrackedResourceUsageDownwards queuePath either could be parent or
leaf queue path. If it is parent queue path, then traverse upto the end leaf
+// recursively for all child queues, reset resourceUsage and
runningApplications to the default value.
+// Once downward traversal has been completed, traverse downwards using
decreaseTrackedResourceUsageUpwards
+func (qt *QueueTracker) decreaseTrackedResourceUsageDownwards(queuePath
string) map[string]bool {
+ childQueueTracker := qt.getChildQueueTracker(queuePath)
+ childQueueTrackers := childQueueTracker.childQueueTrackers
+ removedApplications := make(map[string]bool)
+ for _, childQT := range childQueueTrackers {
+ if len(childQT.runningApplications) > 0 &&
childQT.resourceUsage != resources.NewResource() {
+ removedApplications = childQT.runningApplications
+ childQT.resourceUsage = resources.NewResource()
+ childQT.runningApplications = make(map[string]bool)
+
childQT.decreaseTrackedResourceUsageDownwards(childQT.queuePath)
+ }
+ }
+ qt.decreaseTrackedResourceUsageUpwards(queuePath)
+ return removedApplications
+}
+
+// decreaseTrackedResourceUsageUpwards Traverse upwards all the way upto the
root starting from last queue in queuePath,
+// reset resourceUsage and runningApplications to the default value.
+func (qt *QueueTracker) decreaseTrackedResourceUsageUpwards(queuePath string) {
+ childQueuePath, immediateChildQueueName := getChildQueuePath(queuePath)
+ if childQueuePath != common.Empty {
+ if qt.childQueueTrackers[immediateChildQueueName] == nil {
+ log.Log(log.SchedUGM).Error("Child queueTracker tracker
must be available in child queues map",
+ zap.String("child queueTracker name",
immediateChildQueueName))
+ }
+
qt.childQueueTrackers[immediateChildQueueName].decreaseTrackedResourceUsageUpwards(childQueuePath)
+ }
+ if len(qt.runningApplications) > 0 && qt.resourceUsage !=
resources.NewResource() {
+ qt.resourceUsage = resources.NewResource()
+ qt.runningApplications = make(map[string]bool)
+ }
+}
diff --git a/pkg/scheduler/ugm/user_tracker.go
b/pkg/scheduler/ugm/user_tracker.go
index 0dcb4b95..f0c86486 100644
--- a/pkg/scheduler/ugm/user_tracker.go
+++ b/pkg/scheduler/ugm/user_tracker.go
@@ -63,7 +63,7 @@ func (ut *UserTracker) increaseTrackedResource(queuePath,
applicationID string,
zap.String("queue path", queuePath),
zap.String("application", applicationID),
zap.Stringer("resource", usage))
- increasedGroupUsage := gt.increaseTrackedResource(queuePath,
applicationID, usage)
+ increasedGroupUsage := gt.increaseTrackedResource(queuePath,
applicationID, usage, ut.userName)
if !increasedGroupUsage {
_, decreased :=
ut.queueTracker.decreaseTrackedResource(queuePath, applicationID, usage, false)
if !decreased {
diff --git a/pkg/scheduler/utilities_test.go b/pkg/scheduler/utilities_test.go
index d3bf9683..e4eb9e97 100644
--- a/pkg/scheduler/utilities_test.go
+++ b/pkg/scheduler/utilities_test.go
@@ -426,6 +426,10 @@ func newApplication(appID, partition, queueName string)
*objects.Application {
User: "testuser",
Groups: []string{"testgroup"},
}
+ return newApplicationWithUser(appID, partition, queueName, user)
+}
+
+func newApplicationWithUser(appID, partition, queueName string, user
security.UserGroup) *objects.Application {
siApp := &si.AddApplicationRequest{
ApplicationID: appID,
QueueName: queueName,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]