This is an automated email from the ASF dual-hosted git repository.
mani pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git
The following commit(s) were added to refs/heads/master by this push:
new aee3f38c [YUNIKORN-2134] Use nil resource instead of NewResource()
(#706)
aee3f38c is described below
commit aee3f38c31647c998564ab6086774bd77299db2d
Author: Manikandan R <[email protected]>
AuthorDate: Thu Nov 9 16:38:32 2023 +0530
[YUNIKORN-2134] Use nil resource instead of NewResource() (#706)
Closes: #706
Signed-off-by: Manikandan R <[email protected]>
---
pkg/scheduler/ugm/group_tracker_test.go | 2 ++
pkg/scheduler/ugm/manager.go | 46 +++++++++++++++++----------------
pkg/scheduler/ugm/queue_tracker.go | 29 +++++++++++----------
3 files changed, 42 insertions(+), 35 deletions(-)
diff --git a/pkg/scheduler/ugm/group_tracker_test.go
b/pkg/scheduler/ugm/group_tracker_test.go
index 5dbbe544..4a8f3563 100644
--- a/pkg/scheduler/ugm/group_tracker_test.go
+++ b/pkg/scheduler/ugm/group_tracker_test.go
@@ -86,6 +86,8 @@ func TestGTDecreaseTrackedResource(t *testing.T) {
// Queue setup:
// root->parent->child1
// root->parent->child2
+ // Initialize ugm
+ GetUserManager()
user := &security.UserGroup{User: "test", Groups: []string{"test"}}
groupTracker := newGroupTracker(user.User)
usage1, err := resources.NewResourceFromConf(map[string]string{"mem":
"70M", "vcore": "70"})
diff --git a/pkg/scheduler/ugm/manager.go b/pkg/scheduler/ugm/manager.go
index 586366d2..bbbd37f0 100644
--- a/pkg/scheduler/ugm/manager.go
+++ b/pkg/scheduler/ugm/manager.go
@@ -167,26 +167,6 @@ func (m *Manager) DecreaseTrackedResource(queuePath,
applicationID string, usage
return true
}
-func (m *Manager) GetUserResources(user security.UserGroup)
*resources.Resource {
- m.RLock()
- defer m.RUnlock()
- ut := m.userTrackers[user.User]
- if ut != nil &&
len(ut.GetUserResourceUsageDAOInfo().Queues.ResourceUsage.Resources) > 0 {
- return ut.GetUserResourceUsageDAOInfo().Queues.ResourceUsage
- }
- return nil
-}
-
-func (m *Manager) GetGroupResources(group string) *resources.Resource {
- m.RLock()
- defer m.RUnlock()
- gt := m.groupTrackers[group]
- if gt != nil &&
len(gt.GetGroupResourceUsageDAOInfo().Queues.ResourceUsage.Resources) > 0 {
- return gt.GetGroupResourceUsageDAOInfo().Queues.ResourceUsage
- }
- return nil
-}
-
func (m *Manager) GetUsersResources() []*UserTracker {
m.RLock()
defer m.RUnlock()
@@ -451,7 +431,7 @@ func (m *Manager) resetUserEarlierUsage(ut *UserTracker,
queuePath string) {
log.Log(log.SchedUGM).Debug("Need to clear earlier set configs
for user",
zap.String("user", ut.userName),
zap.String("queue path", queuePath))
- ut.setLimits(queuePath, resources.NewResource(), 0)
+ ut.setLimits(queuePath, nil, 0)
// Is there any running applications in end queue of this queue
path? If not, then remove the linkage between end queue and its immediate parent
if ut.IsUnlinkRequired(queuePath) {
ut.UnlinkQT(queuePath)
@@ -503,7 +483,7 @@ func (m *Manager) resetGroupEarlierUsage(gt *GroupTracker,
queuePath string) {
ut := m.userTrackers[u]
delete(ut.appGroupTrackers, app)
}
- gt.setLimits(queuePath, resources.NewResource(), 0)
+ gt.setLimits(queuePath, nil, 0)
// Is there any running applications in end queue of this queue
path? If not, then remove the linkage between end queue and its immediate parent
if gt.IsUnlinkRequired(queuePath) {
gt.UnlinkQT(queuePath)
@@ -684,3 +664,25 @@ func (m *Manager) ClearConfigLimits() {
m.userLimits = make(map[string]map[string]*LimitConfig)
m.groupLimits = make(map[string]map[string]*LimitConfig)
}
+
+// GetUserResources only for tests
+func (m *Manager) GetUserResources(user security.UserGroup)
*resources.Resource {
+ m.RLock()
+ defer m.RUnlock()
+ ut := m.userTrackers[user.User]
+ if ut != nil && ut.GetUserResourceUsageDAOInfo().Queues.ResourceUsage
!= nil && len(ut.GetUserResourceUsageDAOInfo().Queues.ResourceUsage.Resources)
> 0 {
+ return ut.GetUserResourceUsageDAOInfo().Queues.ResourceUsage
+ }
+ return nil
+}
+
+// GetGroupResources only for tests
+func (m *Manager) GetGroupResources(group string) *resources.Resource {
+ m.RLock()
+ defer m.RUnlock()
+ gt := m.groupTrackers[group]
+ if gt != nil && gt.GetGroupResourceUsageDAOInfo().Queues.ResourceUsage
!= nil && len(gt.GetGroupResourceUsageDAOInfo().Queues.ResourceUsage.Resources)
> 0 {
+ return gt.GetGroupResourceUsageDAOInfo().Queues.ResourceUsage
+ }
+ return nil
+}
diff --git a/pkg/scheduler/ugm/queue_tracker.go
b/pkg/scheduler/ugm/queue_tracker.go
index a9a37efd..8a9fa17a 100644
--- a/pkg/scheduler/ugm/queue_tracker.go
+++ b/pkg/scheduler/ugm/queue_tracker.go
@@ -51,9 +51,9 @@ func newQueueTracker(queuePath string, queueName string)
*QueueTracker {
queueTracker := &QueueTracker{
queueName: queueName,
queuePath: qp,
- resourceUsage: resources.NewResource(),
+ resourceUsage: nil,
runningApplications: make(map[string]bool),
- maxResources: resources.NewResource(),
+ maxResources: nil,
maxRunningApps: 0,
childQueueTrackers: make(map[string]*QueueTracker),
}
@@ -89,13 +89,16 @@ func (qt *QueueTracker) increaseTrackedResource(hierarchy
[]string, applicationI
}
}
+ if qt.resourceUsage == nil {
+ qt.resourceUsage = resources.NewResource()
+ }
finalResourceUsage := qt.resourceUsage.Clone()
finalResourceUsage.AddTo(usage)
wildCardQuotaExceeded := false
existingApp := qt.runningApplications[applicationID]
// apply user/group specific limit settings set if configured,
otherwise use wild card limit settings
- if qt.maxRunningApps != 0 && !resources.Equals(resources.NewResource(),
qt.maxResources) {
+ if qt.maxRunningApps != 0 && !resources.IsZero(qt.maxResources) {
log.Log(log.SchedUGM).Debug("applying enforcement checks using
limit settings of specific user/group",
zap.Int("tracking type", int(trackType)),
zap.String("queue path", qt.queuePath),
@@ -117,7 +120,7 @@ func (qt *QueueTracker) increaseTrackedResource(hierarchy
[]string, applicationI
}
// Try wild card settings
- if qt.maxRunningApps == 0 && resources.Equals(resources.NewResource(),
qt.maxResources) {
+ if qt.maxRunningApps == 0 && resources.IsZero(qt.maxResources) {
// Is there any wild card settings? Do we need to apply
enforcement checks using wild card limit settings?
var config *LimitConfig
if trackType == user {
@@ -127,7 +130,7 @@ func (qt *QueueTracker) increaseTrackedResource(hierarchy
[]string, applicationI
}
if config != nil {
wildCardQuotaExceeded = (config.maxApplications != 0 &&
!existingApp && len(qt.runningApplications)+1 > int(config.maxApplications)) ||
- (!resources.Equals(resources.NewResource(),
config.maxResources) && resources.StrictlyGreaterThan(finalResourceUsage,
config.maxResources))
+ (!resources.IsZero(config.maxResources) &&
resources.StrictlyGreaterThan(finalResourceUsage, config.maxResources))
log.Log(log.SchedUGM).Debug("applying enforcement
checks using limit settings of wild card user/group",
zap.Int("tracking type", int(trackType)),
zap.String("queue path", qt.queuePath),
@@ -210,7 +213,7 @@ func (qt *QueueTracker) decreaseTrackedResource(hierarchy
[]string, applicationI
// Determine if the queue tracker should be removed
removeQT := len(qt.childQueueTrackers) == 0 &&
len(qt.runningApplications) == 0 && resources.IsZero(qt.resourceUsage) &&
- qt.maxRunningApps == 0 &&
resources.Equals(resources.NewResource(), qt.maxResources)
+ qt.maxRunningApps == 0 && resources.IsZero(qt.maxResources)
log.Log(log.SchedUGM).Debug("Remove queue tracker",
zap.String("queue path ", qt.queuePath),
zap.Bool("remove QT", removeQT))
@@ -255,7 +258,7 @@ func (qt *QueueTracker) headroom(hierarchy []string,
trackType trackingType) *re
}
// arrived at the leaf or on the way out: check against current max if
set
- if !resources.Equals(resources.NewResource(), qt.maxResources) {
+ if !resources.IsZero(qt.maxResources) {
headroom = qt.maxResources.Clone()
headroom.SubOnlyExisting(qt.resourceUsage)
log.Log(log.SchedUGM).Debug("Calculated headroom",
@@ -263,7 +266,7 @@ func (qt *QueueTracker) headroom(hierarchy []string,
trackType trackingType) *re
zap.Int("tracking type", int(trackType)),
zap.Stringer("max resource", qt.maxResources),
zap.Stringer("headroom", headroom))
- } else if resources.Equals(nil, childHeadroom) {
+ } else if resources.IsZero(childHeadroom) {
// If childHeadroom is not nil, it means there is an user or
wildcard limit config in child queue,
// so we don't check wildcard limit config in current queue.
@@ -409,15 +412,15 @@ func (qt *QueueTracker)
decreaseTrackedResourceUsageDownwards(hierarchy []string
// reach end of hierarchy, remove all resources under this queue
removedApplications = qt.runningApplications
for childName, childQT := range qt.childQueueTrackers {
- if len(childQT.runningApplications) > 0 &&
childQT.resourceUsage != resources.NewResource() {
+ if len(childQT.runningApplications) > 0 &&
!resources.IsZero(childQT.resourceUsage) {
// runningApplications in parent queue should
contain all the running applications in child queues,
// so we don't need to update
removedApplications from child queue result.
childQT.decreaseTrackedResourceUsageDownwards([]string{childName})
}
}
}
- if len(qt.runningApplications) > 0 && qt.resourceUsage !=
resources.NewResource() {
- qt.resourceUsage = resources.NewResource()
+ if len(qt.runningApplications) > 0 &&
!resources.IsZero(qt.resourceUsage) {
+ qt.resourceUsage = nil
qt.runningApplications = make(map[string]bool)
}
return removedApplications
@@ -501,8 +504,8 @@ func (qt *QueueTracker) canBeRemoved() bool {
}
func (qt *QueueTracker) canBeRemovedInternal() bool {
- if len(qt.runningApplications) == 0 &&
resources.Equals(resources.NewResource(), qt.resourceUsage) &&
len(qt.childQueueTrackers) == 0 &&
- qt.maxRunningApps == 0 &&
resources.Equals(resources.NewResource(), qt.maxResources) {
+ if len(qt.runningApplications) == 0 &&
resources.IsZero(qt.resourceUsage) && len(qt.childQueueTrackers) == 0 &&
+ qt.maxRunningApps == 0 && resources.IsZero(qt.maxResources) {
return true
}
return false
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]