This is an automated email from the ASF dual-hosted git repository.

mani pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git


The following commit(s) were added to refs/heads/master by this push:
     new ca58806f [YUNIKORN-1917] enforce ugm maxapplications rule (#621)
ca58806f is described below

commit ca58806fcc83d6b500cac78fd87964396b370a95
Author: Frank Yang <[email protected]>
AuthorDate: Tue Aug 22 12:28:34 2023 +0530

    [YUNIKORN-1917] enforce ugm maxapplications rule (#621)
    
    Closes: #621
    
    Signed-off-by: Manikandan R <[email protected]>
---
 pkg/scheduler/objects/queue.go     |   3 +-
 pkg/scheduler/partition_test.go    | 155 +++++++++++++++++++++++++++++++++++++
 pkg/scheduler/ugm/group_tracker.go |   6 ++
 pkg/scheduler/ugm/manager.go       |  29 +++++++
 pkg/scheduler/ugm/manager_test.go  | 128 ++++++++++++++++++++++++++++++
 pkg/scheduler/ugm/queue_tracker.go |  52 +++++++++++++
 pkg/scheduler/ugm/user_tracker.go  |   6 ++
 7 files changed, 378 insertions(+), 1 deletion(-)

diff --git a/pkg/scheduler/objects/queue.go b/pkg/scheduler/objects/queue.go
index 32255d01..902cfe20 100644
--- a/pkg/scheduler/objects/queue.go
+++ b/pkg/scheduler/objects/queue.go
@@ -37,6 +37,7 @@ import (
        "github.com/apache/yunikorn-core/pkg/metrics"
        "github.com/apache/yunikorn-core/pkg/scheduler/objects/template"
        "github.com/apache/yunikorn-core/pkg/scheduler/policies"
+       "github.com/apache/yunikorn-core/pkg/scheduler/ugm"
        "github.com/apache/yunikorn-core/pkg/webservice/dao"
        "github.com/apache/yunikorn-scheduler-interface/lib/go/common"
 )
@@ -1297,7 +1298,7 @@ func (sq *Queue) TryAllocate(iterator func() 
NodeIterator, fullIterator func() N
 
                // process the apps (filters out app without pending requests)
                for _, app := range sq.sortApplications(true, false) {
-                       if app.IsAccepted() && !sq.canRunApp(app.ApplicationID) 
{
+                       if app.IsAccepted() && 
(!sq.canRunApp(app.ApplicationID) || 
!ugm.GetUserManager().CanRunApp(sq.QueuePath, app.ApplicationID, app.user)) {
                                continue
                        }
                        alloc := app.tryAllocate(headRoom, preemptionDelay, 
&preemptAttemptsRemaining, iterator, fullIterator, getnode)
diff --git a/pkg/scheduler/partition_test.go b/pkg/scheduler/partition_test.go
index 9f4c8125..88381404 100644
--- a/pkg/scheduler/partition_test.go
+++ b/pkg/scheduler/partition_test.go
@@ -3642,3 +3642,158 @@ func TestReservationTracking(t *testing.T) {
        assert.Equal(t, 0, partition.getReservationCount())
        assert.Equal(t, "alloc-2", alloc.GetAllocationKey())
 }
+
+//nolint:funlen
+func TestLimitMaxApplications(t *testing.T) {
+       testCases := []struct {
+               name   string
+               limits []configs.Limit
+       }{
+               {
+                       name: "specific user",
+                       limits: []configs.Limit{
+                               {
+                                       Limit:           "specific user limit",
+                                       Users:           []string{"testuser"},
+                                       MaxResources:    
map[string]string{"memory": "5", "vcores": "5"},
+                                       MaxApplications: 1,
+                               },
+                       },
+               },
+               {
+                       name: "specific group",
+                       limits: []configs.Limit{
+                               {
+                                       Limit:           "specific group limit",
+                                       Groups:          []string{"testgroup"},
+                                       MaxResources:    
map[string]string{"memory": "5", "vcores": "5"},
+                                       MaxApplications: 1,
+                               },
+                       },
+               },
+               {
+                       name: "wildcard user",
+                       limits: []configs.Limit{
+                               {
+                                       Limit:           "wildcard user limit",
+                                       Users:           []string{"*"},
+                                       MaxResources:    
map[string]string{"memory": "5", "vcores": "5"},
+                                       MaxApplications: 1,
+                               },
+                       },
+               },
+               {
+                       name: "wildcard group",
+                       limits: []configs.Limit{
+                               {
+                                       Limit:           "specific group limit",
+                                       Groups:          
[]string{"nonexistent-group"},
+                                       MaxResources:    
map[string]string{"memory": "500", "vcores": "500"},
+                                       MaxApplications: 100,
+                               },
+                               {
+                                       Limit:           "wildcard group limit",
+                                       Groups:          []string{"*"},
+                                       MaxResources:    
map[string]string{"memory": "5", "vcores": "5"},
+                                       MaxApplications: 1,
+                               },
+                       },
+               },
+               {
+                       name: "specific user lower than specific group limit",
+                       limits: []configs.Limit{
+                               {
+                                       Limit:           "specific user limit",
+                                       Users:           []string{"testuser"},
+                                       MaxResources:    
map[string]string{"memory": "5", "vcores": "5"},
+                                       MaxApplications: 1,
+                               },
+                               {
+                                       Limit:           "specific user limit",
+                                       Groups:          []string{"testgroup"},
+                                       MaxResources:    
map[string]string{"memory": "5", "vcores": "5"},
+                                       MaxApplications: 100,
+                               },
+                       },
+               },
+               {
+                       name: "specific group lower than specific user limit",
+                       limits: []configs.Limit{
+                               {
+                                       Limit:           "specific user limit",
+                                       Users:           []string{"testuser"},
+                                       MaxResources:    
map[string]string{"memory": "5", "vcores": "5"},
+                                       MaxApplications: 100,
+                               },
+                               {
+                                       Limit:           "specific group limit",
+                                       Groups:          []string{"testgroup"},
+                                       MaxResources:    
map[string]string{"memory": "5", "vcores": "5"},
+                                       MaxApplications: 1,
+                               },
+                       },
+               },
+       }
+       for _, tc := range testCases {
+               t.Run(tc.name, func(t *testing.T) {
+                       setupUGM()
+                       conf := configs.PartitionConfig{
+                               Name: "default",
+                               Queues: []configs.QueueConfig{
+                                       {
+                                               Name:      "root",
+                                               Parent:    true,
+                                               SubmitACL: "*",
+                                               Queues: []configs.QueueConfig{
+                                                       {
+                                                               Name:   
"default",
+                                                               Parent: false,
+                                                               Limits: 
tc.limits,
+                                                       },
+                                               },
+                                       },
+                               },
+                               NodeSortPolicy: configs.NodeSortingPolicy{},
+                       }
+
+                       partition, err := newPartitionContext(conf, rmID, nil)
+                       assert.NilError(t, err, "partition create failed")
+
+                       // add node1
+                       nodeRes, err := 
resources.NewResourceFromConf(map[string]string{"memory": "10", "vcores": "10"})
+                       assert.NilError(t, err, "failed to create basic 
resource")
+                       err = partition.AddNode(newNodeMaxResource("node-1", 
nodeRes), nil)
+                       assert.NilError(t, err, "test node1 add failed 
unexpected")
+
+                       resMap := map[string]string{"memory": "2", "vcores": 
"2"}
+                       res, err := resources.NewResourceFromConf(resMap)
+                       assert.NilError(t, err, "Unexpected error when creating 
resource from map")
+
+                       // add app1
+                       app1 := newApplication(appID1, "default", defQueue)
+                       err = partition.AddApplication(app1)
+                       assert.NilError(t, err, "add application to partition 
should not have failed")
+                       err = app1.AddAllocationAsk(newAllocationAsk(allocID, 
appID1, res))
+                       assert.NilError(t, err, "failed to add ask alloc-1 to 
app-1")
+
+                       alloc := partition.tryAllocate()
+                       if alloc == nil {
+                               t.Fatal("allocation did not return any 
allocation")
+                       }
+                       assert.Equal(t, alloc.GetResult(), objects.Allocated, 
"result is not the expected allocated")
+                       assert.Equal(t, alloc.GetApplicationID(), appID1, 
"expected application app-1 to be allocated")
+                       assert.Equal(t, alloc.GetAllocationKey(), allocID, 
"expected ask alloc-1 to be allocated")
+
+                       // add app2
+                       app2 := newApplication(appID2, "default", defQueue)
+                       err = partition.AddApplication(app2)
+                       assert.NilError(t, err, "add application to partition 
should not have failed")
+                       err = app2.AddAllocationAsk(newAllocationAsk(allocID2, 
appID2, res))
+                       assert.NilError(t, err, "failed to add ask alloc-2 to 
app-1")
+                       assert.Equal(t, app2.CurrentState(), 
objects.Accepted.String(), "application should have moved to accepted state")
+
+                       alloc = partition.tryAllocate()
+                       assert.Equal(t, alloc == nil, true, "allocation should 
not have happened as max apps reached")
+               })
+       }
+}
diff --git a/pkg/scheduler/ugm/group_tracker.go 
b/pkg/scheduler/ugm/group_tracker.go
index 8fde8618..150016c5 100644
--- a/pkg/scheduler/ugm/group_tracker.go
+++ b/pkg/scheduler/ugm/group_tracker.go
@@ -145,3 +145,9 @@ func (gt *GroupTracker) 
decreaseAllTrackedResourceUsage(queuePath string) map[st
        }
        return removedApplications
 }
+
+func (gt *GroupTracker) canRunApp(queuePath, applicationID string) bool {
+       gt.Lock()
+       defer gt.Unlock()
+       return gt.queueTracker.canRunApp(queuePath, applicationID, group)
+}
diff --git a/pkg/scheduler/ugm/manager.go b/pkg/scheduler/ugm/manager.go
index 76141573..fd40aafe 100644
--- a/pkg/scheduler/ugm/manager.go
+++ b/pkg/scheduler/ugm/manager.go
@@ -584,6 +584,35 @@ func (m *Manager) Headroom(queuePath, applicationID 
string, user security.UserGr
        return resources.ComponentWiseMinPermissive(userHeadroom, groupHeadroom)
 }
 
+// CanRunApp checks the maxApplications for this specific application that 
runs as the user and group.
+func (m *Manager) CanRunApp(queuePath, applicationID string, user 
security.UserGroup) bool {
+       userTracker := m.getUserTracker(user.User)
+       userCanRunApp := userTracker.canRunApp(queuePath, applicationID)
+       log.Log(log.SchedUGM).Debug("Check whether user can run app",
+               zap.String("user", user.User),
+               zap.String("queue path", queuePath),
+               zap.Bool("can run app", userCanRunApp))
+       // make sure the user has a groupTracker for this application, if not 
yet there add it
+       if !userTracker.hasGroupForApp(applicationID) {
+               m.ensureGroupTrackerForApp(queuePath, applicationID, user)
+       }
+       // check if this application now has group tracking, if not we're done
+       appGroup := userTracker.getGroupForApp(applicationID)
+       if appGroup == common.Empty {
+               return userCanRunApp
+       }
+       groupTracker := m.GetGroupTracker(appGroup)
+       if groupTracker == nil {
+               return userCanRunApp
+       }
+       groupCanRunApp := groupTracker.canRunApp(queuePath, applicationID)
+       log.Log(log.SchedUGM).Debug("Check whether group can run app",
+               zap.String("group", appGroup),
+               zap.String("queue path", queuePath),
+               zap.Bool("can run app", groupCanRunApp))
+       return userCanRunApp && groupCanRunApp
+}
+
 // ClearUserTrackers only for tests
 func (m *Manager) ClearUserTrackers() {
        m.Lock()
diff --git a/pkg/scheduler/ugm/manager_test.go 
b/pkg/scheduler/ugm/manager_test.go
index d39eea4f..7d54195c 100644
--- a/pkg/scheduler/ugm/manager_test.go
+++ b/pkg/scheduler/ugm/manager_test.go
@@ -19,6 +19,7 @@
 package ugm
 
 import (
+       "fmt"
        "strconv"
        "testing"
 
@@ -674,6 +675,133 @@ func TestDecreaseTrackedResourceForGroupTracker(t 
*testing.T) {
        assert.Equal(t, 
resources.Equals(groupTracker.queueTracker.childQueueTrackers["parent"].resourceUsage,
 resources.Zero), true)
 }
 
+//nolint:funlen
+func TestCanRunApp(t *testing.T) {
+       testCases := []struct {
+               name   string
+               limits []configs.Limit
+       }{
+               {
+                       name: "specific user limit",
+                       limits: []configs.Limit{
+                               {
+                                       Limit:           "specific user limit",
+                                       Users:           []string{"user1"},
+                                       MaxApplications: 1,
+                               },
+                       },
+               },
+               {
+                       name: "specific group limit",
+                       limits: []configs.Limit{
+                               {
+                                       Limit:           "specific group limit",
+                                       Groups:          []string{"group1"},
+                                       MaxApplications: 1,
+                               },
+                       },
+               },
+               {
+                       name: "wildcard user limit",
+                       limits: []configs.Limit{
+                               {
+                                       Limit:           "wildcard user limit",
+                                       Users:           []string{"*"},
+                                       MaxApplications: 1,
+                               },
+                       },
+               },
+               {
+                       name: "wildcard group limit",
+                       limits: []configs.Limit{
+                               {
+                                       Limit:           "specific group limit",
+                                       Groups:          
[]string{"nonexistent-group"},
+                                       MaxApplications: 100,
+                               },
+                               {
+                                       Limit:           "wildcard group limit",
+                                       Groups:          []string{"*"},
+                                       MaxApplications: 1,
+                               },
+                       },
+               },
+               {
+                       name: "specific user lower than specific group limit",
+                       limits: []configs.Limit{
+                               {
+                                       Limit:           "specific user limit",
+                                       Users:           []string{"user1"},
+                                       MaxApplications: 1,
+                               },
+                               {
+                                       Limit:           "specific group limit",
+                                       Groups:          []string{"group1"},
+                                       MaxApplications: 100,
+                               },
+                       },
+               },
+               {
+                       name: "specific group lower than specific user limit",
+                       limits: []configs.Limit{
+                               {
+                                       Limit:           "specific user limit",
+                                       Users:           []string{"user1"},
+                                       MaxApplications: 100,
+                               },
+                               {
+                                       Limit:           "specific group limit",
+                                       Groups:          []string{"group1"},
+                                       MaxApplications: 1,
+                               },
+                       },
+               },
+       }
+
+       for _, tc := range testCases {
+               t.Run(tc.name, func(t *testing.T) {
+                       setupUGM()
+                       // Queue setup:
+                       // root->default
+                       conf := configs.PartitionConfig{
+                               Name: "default",
+                               Queues: []configs.QueueConfig{
+                                       {
+                                               Name:      "root",
+                                               Parent:    true,
+                                               SubmitACL: "*",
+                                               Queues: []configs.QueueConfig{
+                                                       {
+                                                               Name:      
"default",
+                                                               Parent:    
false,
+                                                               SubmitACL: "*",
+                                                               Limits:    
tc.limits,
+                                                       },
+                                               },
+                                       },
+                               },
+                       }
+                       manager := GetUserManager()
+                       assert.NilError(t, manager.UpdateConfig(conf.Queues[0], 
"root"))
+
+                       user := security.UserGroup{User: "user1", Groups: 
[]string{"group1"}}
+                       usage, err := 
resources.NewResourceFromConf(map[string]string{"memory": "50"})
+                       if err != nil {
+                               t.Errorf("new resource create returned error or 
wrong resource: error %t, res %v", err, usage)
+                       }
+
+                       canRunApp := manager.CanRunApp("root.default", 
TestApp1, user)
+                       assert.Equal(t, canRunApp, true, fmt.Sprintf("user %s 
should be able to run app %s", user.User, TestApp1))
+
+                       increased := 
manager.IncreaseTrackedResource("root.default", TestApp1, usage, user)
+                       assert.Equal(t, increased, true, "unable to increase 
tracked resource: queuepath root.parent, app "+TestApp1+", res "+usage.String())
+
+                       canRunApp = manager.CanRunApp("root.default", TestApp2, 
user)
+                       assert.Equal(t, canRunApp, false, fmt.Sprintf("user %s 
shouldn't be able to run app %s", user.User, TestApp2))
+               })
+       }
+}
+
 func createUpdateConfigWithWildCardUsersAndGroups(user string, group string, 
wildUser string, wildGroup string, memory string, vcores string) 
configs.PartitionConfig {
        conf := configs.PartitionConfig{
                Name: "test",
diff --git a/pkg/scheduler/ugm/queue_tracker.go 
b/pkg/scheduler/ugm/queue_tracker.go
index edcb0c07..8dc0fe99 100644
--- a/pkg/scheduler/ugm/queue_tracker.go
+++ b/pkg/scheduler/ugm/queue_tracker.go
@@ -400,3 +400,55 @@ func (qt *QueueTracker) 
decreaseTrackedResourceUsageUpwards(queuePath string) {
                qt.runningApplications = make(map[string]bool)
        }
 }
+
+func (qt *QueueTracker) canRunApp(queuePath string, applicationID string, 
trackType trackingType) bool {
+       log.Log(log.SchedUGM).Debug("Checking can run app",
+               zap.Int("tracking type", int(trackType)),
+               zap.String("queue path", queuePath),
+               zap.String("application", applicationID))
+       childQueuePath, immediateChildQueueName := getChildQueuePath(queuePath)
+       if childQueuePath != common.Empty {
+               if qt.childQueueTrackers[immediateChildQueueName] != nil {
+                       allowed := 
qt.childQueueTrackers[immediateChildQueueName].canRunApp(childQueuePath, 
applicationID, trackType)
+                       if !allowed {
+                               return false
+                       }
+               }
+       }
+
+       var running int
+       if existingApp := qt.runningApplications[applicationID]; existingApp {
+               return true
+       } else {
+               running = len(qt.runningApplications) + 1
+       }
+
+       // apply user/group specific limit settings set if configured, 
otherwise use wild card limit settings
+       if qt.maxRunningApps != 0 && running > int(qt.maxRunningApps) {
+               log.Log(log.SchedUGM).Warn("can't run app as allowing new 
application to run would exceed configured max applications limit of specific 
user/group",
+                       zap.Int("tracking type", int(trackType)),
+                       zap.String("queue path", queuePath),
+                       zap.Int("current running applications", 
len(qt.runningApplications)),
+                       zap.Uint64("max running applications", 
qt.maxRunningApps))
+               return false
+       }
+
+       // Try wild card settings
+       if qt.maxRunningApps == 0 {
+               var config *LimitConfig
+               if trackType == user {
+                       config = m.getUserWildCardLimitsConfig(qt.queuePath)
+               } else if trackType == group {
+                       config = m.getGroupWildCardLimitsConfig(qt.queuePath)
+               }
+               if config != nil && config.maxApplications != 0 && running > 
int(config.maxApplications) {
+                       log.Log(log.SchedUGM).Warn("can't run app as allowing 
new application to run would exceed configured max applications limit of 
wildcard user/group",
+                               zap.Int("tracking type", int(trackType)),
+                               zap.String("queue path", queuePath),
+                               zap.Int("current running applications", 
len(qt.runningApplications)),
+                               zap.Uint64("max running applications", 
config.maxApplications))
+                       return false
+               }
+       }
+       return true
+}
diff --git a/pkg/scheduler/ugm/user_tracker.go 
b/pkg/scheduler/ugm/user_tracker.go
index f0c86486..6b3bba24 100644
--- a/pkg/scheduler/ugm/user_tracker.go
+++ b/pkg/scheduler/ugm/user_tracker.go
@@ -167,3 +167,9 @@ func (ut *UserTracker) canBeRemoved() bool {
        defer ut.RUnlock()
        return len(ut.queueTracker.childQueueTrackers) == 0 && 
len(ut.queueTracker.runningApplications) == 0
 }
+
+func (ut *UserTracker) canRunApp(queuePath, applicationID string) bool {
+       ut.Lock()
+       defer ut.Unlock()
+       return ut.queueTracker.canRunApp(queuePath, applicationID, user)
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to