This is an automated email from the ASF dual-hosted git repository.
mani pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git
The following commit(s) were added to refs/heads/master by this push:
new ca58806f [YUNIKORN-1917] enforce ugm maxapplications rule (#621)
ca58806f is described below
commit ca58806fcc83d6b500cac78fd87964396b370a95
Author: Frank Yang <[email protected]>
AuthorDate: Tue Aug 22 12:28:34 2023 +0530
[YUNIKORN-1917] enforce ugm maxapplications rule (#621)
Closes: #621
Signed-off-by: Manikandan R <[email protected]>
---
pkg/scheduler/objects/queue.go | 3 +-
pkg/scheduler/partition_test.go | 155 +++++++++++++++++++++++++++++++++++++
pkg/scheduler/ugm/group_tracker.go | 6 ++
pkg/scheduler/ugm/manager.go | 29 +++++++
pkg/scheduler/ugm/manager_test.go | 128 ++++++++++++++++++++++++++++++
pkg/scheduler/ugm/queue_tracker.go | 52 +++++++++++++
pkg/scheduler/ugm/user_tracker.go | 6 ++
7 files changed, 378 insertions(+), 1 deletion(-)
diff --git a/pkg/scheduler/objects/queue.go b/pkg/scheduler/objects/queue.go
index 32255d01..902cfe20 100644
--- a/pkg/scheduler/objects/queue.go
+++ b/pkg/scheduler/objects/queue.go
@@ -37,6 +37,7 @@ import (
"github.com/apache/yunikorn-core/pkg/metrics"
"github.com/apache/yunikorn-core/pkg/scheduler/objects/template"
"github.com/apache/yunikorn-core/pkg/scheduler/policies"
+ "github.com/apache/yunikorn-core/pkg/scheduler/ugm"
"github.com/apache/yunikorn-core/pkg/webservice/dao"
"github.com/apache/yunikorn-scheduler-interface/lib/go/common"
)
@@ -1297,7 +1298,7 @@ func (sq *Queue) TryAllocate(iterator func()
NodeIterator, fullIterator func() N
// process the apps (filters out app without pending requests)
for _, app := range sq.sortApplications(true, false) {
- if app.IsAccepted() && !sq.canRunApp(app.ApplicationID)
{
+ if app.IsAccepted() &&
(!sq.canRunApp(app.ApplicationID) ||
!ugm.GetUserManager().CanRunApp(sq.QueuePath, app.ApplicationID, app.user)) {
continue
}
alloc := app.tryAllocate(headRoom, preemptionDelay,
&preemptAttemptsRemaining, iterator, fullIterator, getnode)
diff --git a/pkg/scheduler/partition_test.go b/pkg/scheduler/partition_test.go
index 9f4c8125..88381404 100644
--- a/pkg/scheduler/partition_test.go
+++ b/pkg/scheduler/partition_test.go
@@ -3642,3 +3642,158 @@ func TestReservationTracking(t *testing.T) {
assert.Equal(t, 0, partition.getReservationCount())
assert.Equal(t, "alloc-2", alloc.GetAllocationKey())
}
+
+//nolint:funlen
+func TestLimitMaxApplications(t *testing.T) {
+ testCases := []struct {
+ name string
+ limits []configs.Limit
+ }{
+ {
+ name: "specific user",
+ limits: []configs.Limit{
+ {
+ Limit: "specific user limit",
+ Users: []string{"testuser"},
+ MaxResources:
map[string]string{"memory": "5", "vcores": "5"},
+ MaxApplications: 1,
+ },
+ },
+ },
+ {
+ name: "specific group",
+ limits: []configs.Limit{
+ {
+ Limit: "specific group limit",
+ Groups: []string{"testgroup"},
+ MaxResources:
map[string]string{"memory": "5", "vcores": "5"},
+ MaxApplications: 1,
+ },
+ },
+ },
+ {
+ name: "wildcard user",
+ limits: []configs.Limit{
+ {
+ Limit: "wildcard user limit",
+ Users: []string{"*"},
+ MaxResources:
map[string]string{"memory": "5", "vcores": "5"},
+ MaxApplications: 1,
+ },
+ },
+ },
+ {
+ name: "wildcard group",
+ limits: []configs.Limit{
+ {
+ Limit: "specific group limit",
+ Groups:
[]string{"nonexistent-group"},
+ MaxResources:
map[string]string{"memory": "500", "vcores": "500"},
+ MaxApplications: 100,
+ },
+ {
+ Limit: "wildcard group limit",
+ Groups: []string{"*"},
+ MaxResources:
map[string]string{"memory": "5", "vcores": "5"},
+ MaxApplications: 1,
+ },
+ },
+ },
+ {
+ name: "specific user lower than specific group limit",
+ limits: []configs.Limit{
+ {
+ Limit: "specific user limit",
+ Users: []string{"testuser"},
+ MaxResources:
map[string]string{"memory": "5", "vcores": "5"},
+ MaxApplications: 1,
+ },
+ {
+ Limit: "specific user limit",
+ Groups: []string{"testgroup"},
+ MaxResources:
map[string]string{"memory": "5", "vcores": "5"},
+ MaxApplications: 100,
+ },
+ },
+ },
+ {
+ name: "specific group lower than specific user limit",
+ limits: []configs.Limit{
+ {
+ Limit: "specific user limit",
+ Users: []string{"testuser"},
+ MaxResources:
map[string]string{"memory": "5", "vcores": "5"},
+ MaxApplications: 100,
+ },
+ {
+ Limit: "specific group limit",
+ Groups: []string{"testgroup"},
+ MaxResources:
map[string]string{"memory": "5", "vcores": "5"},
+ MaxApplications: 1,
+ },
+ },
+ },
+ }
+ for _, tc := range testCases {
+ t.Run(tc.name, func(t *testing.T) {
+ setupUGM()
+ conf := configs.PartitionConfig{
+ Name: "default",
+ Queues: []configs.QueueConfig{
+ {
+ Name: "root",
+ Parent: true,
+ SubmitACL: "*",
+ Queues: []configs.QueueConfig{
+ {
+ Name:
"default",
+ Parent: false,
+ Limits:
tc.limits,
+ },
+ },
+ },
+ },
+ NodeSortPolicy: configs.NodeSortingPolicy{},
+ }
+
+ partition, err := newPartitionContext(conf, rmID, nil)
+ assert.NilError(t, err, "partition create failed")
+
+ // add node1
+ nodeRes, err :=
resources.NewResourceFromConf(map[string]string{"memory": "10", "vcores": "10"})
+ assert.NilError(t, err, "failed to create basic
resource")
+ err = partition.AddNode(newNodeMaxResource("node-1",
nodeRes), nil)
+ assert.NilError(t, err, "test node1 add failed
unexpected")
+
+ resMap := map[string]string{"memory": "2", "vcores":
"2"}
+ res, err := resources.NewResourceFromConf(resMap)
+ assert.NilError(t, err, "Unexpected error when creating
resource from map")
+
+ // add app1
+ app1 := newApplication(appID1, "default", defQueue)
+ err = partition.AddApplication(app1)
+ assert.NilError(t, err, "add application to partition
should not have failed")
+ err = app1.AddAllocationAsk(newAllocationAsk(allocID,
appID1, res))
+ assert.NilError(t, err, "failed to add ask alloc-1 to
app-1")
+
+ alloc := partition.tryAllocate()
+ if alloc == nil {
+ t.Fatal("allocation did not return any
allocation")
+ }
+ assert.Equal(t, alloc.GetResult(), objects.Allocated,
"result is not the expected allocated")
+ assert.Equal(t, alloc.GetApplicationID(), appID1,
"expected application app-1 to be allocated")
+ assert.Equal(t, alloc.GetAllocationKey(), allocID,
"expected ask alloc-1 to be allocated")
+
+ // add app2
+ app2 := newApplication(appID2, "default", defQueue)
+ err = partition.AddApplication(app2)
+ assert.NilError(t, err, "add application to partition
should not have failed")
+ err = app2.AddAllocationAsk(newAllocationAsk(allocID2,
appID2, res))
+ assert.NilError(t, err, "failed to add ask alloc-2 to
app-1")
+ assert.Equal(t, app2.CurrentState(),
objects.Accepted.String(), "application should have moved to accepted state")
+
+ alloc = partition.tryAllocate()
+ assert.Equal(t, alloc == nil, true, "allocation should
not have happened as max apps reached")
+ })
+ }
+}
diff --git a/pkg/scheduler/ugm/group_tracker.go
b/pkg/scheduler/ugm/group_tracker.go
index 8fde8618..150016c5 100644
--- a/pkg/scheduler/ugm/group_tracker.go
+++ b/pkg/scheduler/ugm/group_tracker.go
@@ -145,3 +145,9 @@ func (gt *GroupTracker)
decreaseAllTrackedResourceUsage(queuePath string) map[st
}
return removedApplications
}
+
+func (gt *GroupTracker) canRunApp(queuePath, applicationID string) bool {
+ gt.Lock()
+ defer gt.Unlock()
+ return gt.queueTracker.canRunApp(queuePath, applicationID, group)
+}
diff --git a/pkg/scheduler/ugm/manager.go b/pkg/scheduler/ugm/manager.go
index 76141573..fd40aafe 100644
--- a/pkg/scheduler/ugm/manager.go
+++ b/pkg/scheduler/ugm/manager.go
@@ -584,6 +584,35 @@ func (m *Manager) Headroom(queuePath, applicationID
string, user security.UserGr
return resources.ComponentWiseMinPermissive(userHeadroom, groupHeadroom)
}
+// CanRunApp checks the maxApplications for this specific application that
runs as the user and group.
+func (m *Manager) CanRunApp(queuePath, applicationID string, user
security.UserGroup) bool {
+ userTracker := m.getUserTracker(user.User)
+ userCanRunApp := userTracker.canRunApp(queuePath, applicationID)
+ log.Log(log.SchedUGM).Debug("Check whether user can run app",
+ zap.String("user", user.User),
+ zap.String("queue path", queuePath),
+ zap.Bool("can run app", userCanRunApp))
+ // make sure the user has a groupTracker for this application, if not
yet there add it
+ if !userTracker.hasGroupForApp(applicationID) {
+ m.ensureGroupTrackerForApp(queuePath, applicationID, user)
+ }
+ // check if this application now has group tracking, if not we're done
+ appGroup := userTracker.getGroupForApp(applicationID)
+ if appGroup == common.Empty {
+ return userCanRunApp
+ }
+ groupTracker := m.GetGroupTracker(appGroup)
+ if groupTracker == nil {
+ return userCanRunApp
+ }
+ groupCanRunApp := groupTracker.canRunApp(queuePath, applicationID)
+ log.Log(log.SchedUGM).Debug("Check whether group can run app",
+ zap.String("group", appGroup),
+ zap.String("queue path", queuePath),
+ zap.Bool("can run app", groupCanRunApp))
+ return userCanRunApp && groupCanRunApp
+}
+
// ClearUserTrackers only for tests
func (m *Manager) ClearUserTrackers() {
m.Lock()
diff --git a/pkg/scheduler/ugm/manager_test.go
b/pkg/scheduler/ugm/manager_test.go
index d39eea4f..7d54195c 100644
--- a/pkg/scheduler/ugm/manager_test.go
+++ b/pkg/scheduler/ugm/manager_test.go
@@ -19,6 +19,7 @@
package ugm
import (
+ "fmt"
"strconv"
"testing"
@@ -674,6 +675,133 @@ func TestDecreaseTrackedResourceForGroupTracker(t
*testing.T) {
assert.Equal(t,
resources.Equals(groupTracker.queueTracker.childQueueTrackers["parent"].resourceUsage,
resources.Zero), true)
}
+//nolint:funlen
+func TestCanRunApp(t *testing.T) {
+ testCases := []struct {
+ name string
+ limits []configs.Limit
+ }{
+ {
+ name: "specific user limit",
+ limits: []configs.Limit{
+ {
+ Limit: "specific user limit",
+ Users: []string{"user1"},
+ MaxApplications: 1,
+ },
+ },
+ },
+ {
+ name: "specific group limit",
+ limits: []configs.Limit{
+ {
+ Limit: "specific group limit",
+ Groups: []string{"group1"},
+ MaxApplications: 1,
+ },
+ },
+ },
+ {
+ name: "wildcard user limit",
+ limits: []configs.Limit{
+ {
+ Limit: "wildcard user limit",
+ Users: []string{"*"},
+ MaxApplications: 1,
+ },
+ },
+ },
+ {
+ name: "wildcard group limit",
+ limits: []configs.Limit{
+ {
+ Limit: "specific group limit",
+ Groups:
[]string{"nonexistent-group"},
+ MaxApplications: 100,
+ },
+ {
+ Limit: "wildcard group limit",
+ Groups: []string{"*"},
+ MaxApplications: 1,
+ },
+ },
+ },
+ {
+ name: "specific user lower than specific group limit",
+ limits: []configs.Limit{
+ {
+ Limit: "specific user limit",
+ Users: []string{"user1"},
+ MaxApplications: 1,
+ },
+ {
+ Limit: "specific group limit",
+ Groups: []string{"group1"},
+ MaxApplications: 100,
+ },
+ },
+ },
+ {
+ name: "specific group lower than specific user limit",
+ limits: []configs.Limit{
+ {
+ Limit: "specific user limit",
+ Users: []string{"user1"},
+ MaxApplications: 100,
+ },
+ {
+ Limit: "specific group limit",
+ Groups: []string{"group1"},
+ MaxApplications: 1,
+ },
+ },
+ },
+ }
+
+ for _, tc := range testCases {
+ t.Run(tc.name, func(t *testing.T) {
+ setupUGM()
+ // Queue setup:
+ // root->default
+ conf := configs.PartitionConfig{
+ Name: "default",
+ Queues: []configs.QueueConfig{
+ {
+ Name: "root",
+ Parent: true,
+ SubmitACL: "*",
+ Queues: []configs.QueueConfig{
+ {
+ Name:
"default",
+ Parent:
false,
+ SubmitACL: "*",
+ Limits:
tc.limits,
+ },
+ },
+ },
+ },
+ }
+ manager := GetUserManager()
+ assert.NilError(t, manager.UpdateConfig(conf.Queues[0],
"root"))
+
+ user := security.UserGroup{User: "user1", Groups:
[]string{"group1"}}
+ usage, err :=
resources.NewResourceFromConf(map[string]string{"memory": "50"})
+ if err != nil {
+ t.Errorf("new resource create returned error or
wrong resource: error %t, res %v", err, usage)
+ }
+
+ canRunApp := manager.CanRunApp("root.default",
TestApp1, user)
+ assert.Equal(t, canRunApp, true, fmt.Sprintf("user %s
should be able to run app %s", user.User, TestApp1))
+
+ increased :=
manager.IncreaseTrackedResource("root.default", TestApp1, usage, user)
+ assert.Equal(t, increased, true, "unable to increase
tracked resource: queuepath root.parent, app "+TestApp1+", res "+usage.String())
+
+ canRunApp = manager.CanRunApp("root.default", TestApp2,
user)
+ assert.Equal(t, canRunApp, false, fmt.Sprintf("user %s
shouldn't be able to run app %s", user.User, TestApp2))
+ })
+ }
+}
+
func createUpdateConfigWithWildCardUsersAndGroups(user string, group string,
wildUser string, wildGroup string, memory string, vcores string)
configs.PartitionConfig {
conf := configs.PartitionConfig{
Name: "test",
diff --git a/pkg/scheduler/ugm/queue_tracker.go
b/pkg/scheduler/ugm/queue_tracker.go
index edcb0c07..8dc0fe99 100644
--- a/pkg/scheduler/ugm/queue_tracker.go
+++ b/pkg/scheduler/ugm/queue_tracker.go
@@ -400,3 +400,55 @@ func (qt *QueueTracker)
decreaseTrackedResourceUsageUpwards(queuePath string) {
qt.runningApplications = make(map[string]bool)
}
}
+
+func (qt *QueueTracker) canRunApp(queuePath string, applicationID string,
trackType trackingType) bool {
+ log.Log(log.SchedUGM).Debug("Checking can run app",
+ zap.Int("tracking type", int(trackType)),
+ zap.String("queue path", queuePath),
+ zap.String("application", applicationID))
+ childQueuePath, immediateChildQueueName := getChildQueuePath(queuePath)
+ if childQueuePath != common.Empty {
+ if qt.childQueueTrackers[immediateChildQueueName] != nil {
+ allowed :=
qt.childQueueTrackers[immediateChildQueueName].canRunApp(childQueuePath,
applicationID, trackType)
+ if !allowed {
+ return false
+ }
+ }
+ }
+
+ var running int
+ if existingApp := qt.runningApplications[applicationID]; existingApp {
+ return true
+ } else {
+ running = len(qt.runningApplications) + 1
+ }
+
+ // apply user/group specific limit settings set if configured,
otherwise use wild card limit settings
+ if qt.maxRunningApps != 0 && running > int(qt.maxRunningApps) {
+ log.Log(log.SchedUGM).Warn("can't run app as allowing new
application to run would exceed configured max applications limit of specific
user/group",
+ zap.Int("tracking type", int(trackType)),
+ zap.String("queue path", queuePath),
+ zap.Int("current running applications",
len(qt.runningApplications)),
+ zap.Uint64("max running applications",
qt.maxRunningApps))
+ return false
+ }
+
+ // Try wild card settings
+ if qt.maxRunningApps == 0 {
+ var config *LimitConfig
+ if trackType == user {
+ config = m.getUserWildCardLimitsConfig(qt.queuePath)
+ } else if trackType == group {
+ config = m.getGroupWildCardLimitsConfig(qt.queuePath)
+ }
+ if config != nil && config.maxApplications != 0 && running >
int(config.maxApplications) {
+ log.Log(log.SchedUGM).Warn("can't run app as allowing
new application to run would exceed configured max applications limit of
wildcard user/group",
+ zap.Int("tracking type", int(trackType)),
+ zap.String("queue path", queuePath),
+ zap.Int("current running applications",
len(qt.runningApplications)),
+ zap.Uint64("max running applications",
config.maxApplications))
+ return false
+ }
+ }
+ return true
+}
diff --git a/pkg/scheduler/ugm/user_tracker.go
b/pkg/scheduler/ugm/user_tracker.go
index f0c86486..6b3bba24 100644
--- a/pkg/scheduler/ugm/user_tracker.go
+++ b/pkg/scheduler/ugm/user_tracker.go
@@ -167,3 +167,9 @@ func (ut *UserTracker) canBeRemoved() bool {
defer ut.RUnlock()
return len(ut.queueTracker.childQueueTrackers) == 0 &&
len(ut.queueTracker.runningApplications) == 0
}
+
+func (ut *UserTracker) canRunApp(queuePath, applicationID string) bool {
+ ut.Lock()
+ defer ut.Unlock()
+ return ut.queueTracker.canRunApp(queuePath, applicationID, user)
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]