This is an automated email from the ASF dual-hosted git repository.

ccondit pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git


The following commit(s) were added to refs/heads/branch-1.5 by this push:
     new 5454d7d3 [YUNIKORN-2703] Core: Fallback to default queue if no 
placement rules match
5454d7d3 is described below

commit 5454d7d326638e971214c40c41c9527dd9c88ee1
Author: Craig Condit <[email protected]>
AuthorDate: Thu Jul 11 12:45:09 2024 -0500

    [YUNIKORN-2703] Core: Fallback to default queue if no placement rules match
---
 pkg/common/constants.go                   |  15 +-
 pkg/scheduler/partition_test.go           |  12 +-
 pkg/scheduler/placement/placement.go      |  15 ++
 pkg/scheduler/placement/placement_test.go | 268 ++++++++++++++++++++++++++++++
 pkg/scheduler/utilities_test.go           |  57 +++++++
 5 files changed, 357 insertions(+), 10 deletions(-)

diff --git a/pkg/common/constants.go b/pkg/common/constants.go
index 2b882ff7..7eae1c79 100644
--- a/pkg/common/constants.go
+++ b/pkg/common/constants.go
@@ -21,11 +21,12 @@ package common
 const (
        Empty = ""
 
-       Wildcard          = "*"
-       Separator         = ","
-       Space             = " "
-       AnonymousUser     = "nobody"
-       AnonymousGroup    = "nogroup"
-       RecoveryQueue     = "@recovery@"
-       RecoveryQueueFull = "root." + RecoveryQueue
+       Wildcard              = "*"
+       Separator             = ","
+       Space                 = " "
+       AnonymousUser         = "nobody"
+       AnonymousGroup        = "nogroup"
+       RecoveryQueue         = "@recovery@"
+       RecoveryQueueFull     = "root." + RecoveryQueue
+       DefaultPlacementQueue = "root.default"
 )
diff --git a/pkg/scheduler/partition_test.go b/pkg/scheduler/partition_test.go
index bd1b5900..1b55d19d 100644
--- a/pkg/scheduler/partition_test.go
+++ b/pkg/scheduler/partition_test.go
@@ -923,7 +923,7 @@ func TestAddApp(t *testing.T) {
 }
 
 func TestAddAppForced(t *testing.T) {
-       partition, err := newBasePartition()
+       partition, err := newBasePartitionNoRootDefault()
        assert.NilError(t, err, "partition create failed")
 
        // add a new app to an invalid queue
@@ -1465,9 +1465,9 @@ func TestUpdateQueues(t *testing.T) {
 }
 
 func TestGetApplication(t *testing.T) {
-       partition, err := newBasePartition()
+       partition, err := newBasePartitionNoRootDefault()
        assert.NilError(t, err, "partition create failed")
-       app := newApplication(appID1, "default", defQueue)
+       app := newApplication(appID1, "default", "root.custom")
        err = partition.AddApplication(app)
        assert.NilError(t, err, "no error expected while adding the 
application")
        assert.Equal(t, partition.GetApplication(appID1), app, "partition 
failed to add app incorrect app returned")
@@ -1479,6 +1479,12 @@ func TestGetApplication(t *testing.T) {
        if partition.GetApplication(appID2) != nil {
                t.Fatal("partition added app incorrectly should have failed")
        }
+
+       partition, err = newBasePartition()
+       assert.NilError(t, err, "partition create failed")
+       err = partition.AddApplication(app2)
+       assert.NilError(t, err, "no error expected while adding the 
application")
+       assert.Equal(t, partition.GetApplication(appID2), app2, "partition 
failed to add app incorrect app returned")
 }
 
 func TestGetQueue(t *testing.T) {
diff --git a/pkg/scheduler/placement/placement.go 
b/pkg/scheduler/placement/placement.go
index 08aac0c4..1d16ec70 100644
--- a/pkg/scheduler/placement/placement.go
+++ b/pkg/scheduler/placement/placement.go
@@ -24,6 +24,7 @@ import (
 
        "go.uber.org/zap"
 
+       "github.com/apache/yunikorn-core/pkg/common"
        "github.com/apache/yunikorn-core/pkg/common/configs"
        "github.com/apache/yunikorn-core/pkg/locking"
        "github.com/apache/yunikorn-core/pkg/log"
@@ -115,7 +116,9 @@ func (m *AppPlacementManager) PlaceApplication(app 
*objects.Application) error {
        var queueName string
        var aclCheck bool
        var err error
+       var remainingRules = len(m.rules)
        for _, checkRule := range m.rules {
+               remainingRules--
                log.Log(log.Config).Debug("Executing rule for placing 
application",
                        zap.String("ruleName", checkRule.getName()),
                        zap.String("application", app.ApplicationID))
@@ -127,6 +130,18 @@ func (m *AppPlacementManager) PlaceApplication(app 
*objects.Application) error {
                        app.SetQueuePath("")
                        return err
                }
+               // if no queue found even after the last rule, try to place in 
the default queue
+               if remainingRules == 0 && queueName == "" {
+                       log.Log(log.Config).Info("No rule matched, placing 
application in default queue",
+                               zap.String("application", app.ApplicationID),
+                               zap.String("defaultQueue", 
common.DefaultPlacementQueue))
+                       // get the queue object
+                       queue := m.queueFn(common.DefaultPlacementQueue)
+                       if queue != nil {
+                               // default queue exist
+                               queueName = common.DefaultPlacementQueue
+                       }
+               }
                // queueName returned make sure ACL allows access and create 
the queueName if not exist
                if queueName != "" {
                        // get the queue object
diff --git a/pkg/scheduler/placement/placement_test.go 
b/pkg/scheduler/placement/placement_test.go
index 947fb94f..ed4d0e3b 100644
--- a/pkg/scheduler/placement/placement_test.go
+++ b/pkg/scheduler/placement/placement_test.go
@@ -23,9 +23,11 @@ import (
 
        "gotest.tools/v3/assert"
 
+       "github.com/apache/yunikorn-core/pkg/common"
        "github.com/apache/yunikorn-core/pkg/common/configs"
        "github.com/apache/yunikorn-core/pkg/common/security"
        "github.com/apache/yunikorn-core/pkg/scheduler/placement/types"
+       siCommon "github.com/apache/yunikorn-scheduler-interface/lib/go/common"
 )
 
 // basic test to check if no rules leave the manager unusable
@@ -256,3 +258,269 @@ partitions:
                t.Errorf("parent queue: app should not have been placed, queue: 
'%s', error: %v", queueName, err)
        }
 }
+
+func TestForcePlaceApp(t *testing.T) {
+       const (
+               provided       = "provided"
+               providedQ      = "root.provided"
+               defQ           = "root.default"
+               customDefaultQ = "root.custom"
+       )
+
+       // Create the structure for the test
+       // specifically no acl to allow on root
+       // root.default - undefined
+       data := `
+partitions:
+  - name: default
+    queues:
+      - name: root
+        submitacl: "any-user"
+        queues:
+          - name: provided
+            submitacl: "*"
+          - name: acldeny
+            submitacl: " "
+          - name: parent
+            parent: true
+            submitacl: "*"
+`
+       err := initQueueStructure([]byte(data))
+       assert.NilError(t, err, "setting up the queue config failed")
+       // update the manager
+       rules := []configs.PlacementRule{
+               {Name: "provided",
+                       Create: false},
+               {Name: "tag",
+                       Value:  "namespace",
+                       Create: true},
+       }
+       man := NewPlacementManager(rules, queueFunc)
+       if man == nil {
+               t.Fatal("placement manager create failed")
+       }
+
+       tags := make(map[string]string)
+       user := security.UserGroup{
+               User:   "any-user",
+               Groups: []string{},
+       }
+       deny := security.UserGroup{
+               User:   "deny-user",
+               Groups: []string{},
+       }
+       var tests = []struct {
+               name   string
+               queue  string
+               placed string
+               tags   map[string]string
+               user   security.UserGroup
+       }{
+               {"empty", "", "", tags, user},
+               {"provided unqualified", provided, providedQ, tags, user},
+               {"provided qualified", providedQ, providedQ, tags, user},
+               {"provided not exist", "unknown", "", tags, user},
+               {"provided parent", "root.parent", "", tags, user},
+               {"acl deny", "root.acldeny", "", tags, deny},
+               {"create", "unknown", "root.namespace", 
map[string]string{"namespace": "namespace"}, user},
+               {"deny create", "unknown", "", map[string]string{"namespace": 
"namespace"}, deny},
+               {"forced exist", providedQ, providedQ, 
map[string]string{siCommon.AppTagCreateForce: "true"}, user},
+               {"forced and create", "unknown", "root.namespace", 
map[string]string{siCommon.AppTagCreateForce: "true", "namespace": 
"namespace"}, user},
+               {"forced and deny create", "unknown", common.RecoveryQueueFull, 
map[string]string{siCommon.AppTagCreateForce: "true", "namespace": 
"namespace"}, deny},
+               {"forced parent", "root.parent", common.RecoveryQueueFull, 
map[string]string{siCommon.AppTagCreateForce: "true"}, user},
+               {"forced acl deny", "root.acldeny", common.RecoveryQueueFull, 
map[string]string{siCommon.AppTagCreateForce: "true"}, deny},
+               {"forced not exist", "unknown", common.RecoveryQueueFull, 
map[string]string{siCommon.AppTagCreateForce: "true"}, user},
+               {"forced not exist acl deny", "unknown", 
common.RecoveryQueueFull, map[string]string{siCommon.AppTagCreateForce: 
"true"}, deny},
+       }
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       app := newApplication("app1", "default", tt.queue, 
tt.user, tt.tags, nil, "")
+                       err = man.PlaceApplication(app)
+                       if tt.placed == "" {
+                               assert.ErrorContains(t, err, "rejected", 
"unexpected error or no error returned")
+                       } else {
+                               assert.NilError(t, err, "unexpected placement 
failure")
+                               assert.Equal(t, tt.placed, app.GetQueuePath(), 
"incorrect queue set")
+                       }
+               })
+       }
+
+       // Update Queue structure
+       // root.default - defined
+       data = `
+partitions:
+  - name: default
+    queues:
+      - name: root
+        submitacl: "any-user"
+        queues:
+          - name: default
+            submitacl: "*"
+          - name: provided
+            submitacl: "*"
+          - name: acldeny
+            submitacl: " "
+          - name: parent
+            parent: true
+            submitacl: "*"
+`
+       err = initQueueStructure([]byte(data))
+       assert.NilError(t, err, "setting up the queue config failed")
+
+       tests = []struct {
+               name   string
+               queue  string
+               placed string
+               tags   map[string]string
+               user   security.UserGroup
+       }{
+               {"empty | defaulQ defined", "", defQ, tags, user},
+               {"provided unqualified | defaulQ defined", provided, providedQ, 
tags, user},
+               {"provided qualified | defaulQ defined", defQ, defQ, tags, 
user},
+               {"provided not exist | defaulQ defined", "unknown", defQ, tags, 
user},
+               {"provided parent | defaulQ defined", "root.parent", defQ, 
tags, user},
+               {"acl deny | defaulQ defined", "root.acldeny", defQ, tags, 
deny},
+               {"create | defaulQ defined", "unknown", "root.namespace", 
map[string]string{"namespace": "namespace"}, user},
+               {"deny create | defaulQ defined", "unknown", defQ, 
map[string]string{"namespace": "namespace"}, deny},
+               {"forced exist | defaulQ defined", defQ, defQ, 
map[string]string{siCommon.AppTagCreateForce: "true"}, user},
+               {"forced and create | defaulQ defined", "unknown", 
"root.namespace", map[string]string{siCommon.AppTagCreateForce: "true", 
"namespace": "namespace"}, user},
+               {"forced and deny create | defaulQ defined", "unknown", 
common.RecoveryQueueFull, map[string]string{siCommon.AppTagCreateForce: "true", 
"namespace": "namespace"}, deny},
+               {"forced parent | defaulQ defined", "root.parent", 
common.RecoveryQueueFull, map[string]string{siCommon.AppTagCreateForce: 
"true"}, user},
+               {"forced acl deny | defaulQ defined", "root.acldeny", 
common.RecoveryQueueFull, map[string]string{siCommon.AppTagCreateForce: 
"true"}, deny},
+               {"forced not exist | defaulQ defined", "unknown", 
common.RecoveryQueueFull, map[string]string{siCommon.AppTagCreateForce: 
"true"}, user},
+               {"forced not exist acl deny | defaulQ defined", "unknown", 
common.RecoveryQueueFull, map[string]string{siCommon.AppTagCreateForce: 
"true"}, deny},
+       }
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       app := newApplication("app1", "default", tt.queue, 
tt.user, tt.tags, nil, "")
+                       err = man.PlaceApplication(app)
+                       if tt.placed == "" {
+                               assert.ErrorContains(t, err, "rejected", 
"unexpected error or no error returned")
+                       } else {
+                               assert.NilError(t, err, "unexpected placement 
failure")
+                               assert.Equal(t, tt.placed, app.GetQueuePath(), 
"incorrect queue set")
+                       }
+               })
+       }
+
+       // initialize queues with no root.default
+       // Add fixed placement rule to define custom default queue
+       data = `
+partitions:
+  - name: default
+    queues:
+      - name: root
+        submitacl: "any-user"
+        queues:
+          - name: custom
+            submitacl: "*"
+          - name: provided
+            submitacl: "*"
+          - name: acldeny
+            submitacl: " "
+          - name: parent
+            parent: true
+            submitacl: "*"
+`
+       err = initQueueStructure([]byte(data))
+       assert.NilError(t, err, "setting up the queue config failed")
+
+       // update the manager
+       rules = []configs.PlacementRule{
+               {Name: "provided",
+                       Create: false},
+               {Name: "tag",
+                       Value:  "namespace",
+                       Create: true},
+               {Name: "fixed",
+                       Value:  "root.custom",
+                       Create: true},
+       }
+       man1 := NewPlacementManager(rules, queueFunc)
+       if man1 == nil {
+               t.Fatal("placement manager create failed")
+       }
+
+       tests = []struct {
+               name   string
+               queue  string
+               placed string
+               tags   map[string]string
+               user   security.UserGroup
+       }{
+               {"empty | custom defaulQ", "", customDefaultQ, tags, user},
+               {"provided unqualified | custom defaulQ", provided, providedQ, 
tags, user},
+               {"provided qualified | custom defaulQ", providedQ, providedQ, 
tags, user},
+               {"provided not exist | custom defaulQ", "unknown", 
customDefaultQ, tags, user},
+               {"provided parent | custom defaulQ", "root.parent", 
customDefaultQ, tags, user},
+               {"acl deny | custom defaulQ", "root.acldeny", customDefaultQ, 
tags, deny},
+               {"create | custom defaulQ", "unknown", "root.namespace", 
map[string]string{"namespace": "namespace"}, user},
+               {"deny create | custom defaulQ", "unknown", customDefaultQ, 
map[string]string{"namespace": "namespace"}, deny},
+               {"forced exist | custom defaulQ", providedQ, providedQ, 
map[string]string{siCommon.AppTagCreateForce: "true"}, user},
+               {"forced and create | custom defaulQ", "unknown", 
"root.namespace", map[string]string{siCommon.AppTagCreateForce: "true", 
"namespace": "namespace"}, user},
+       }
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       app := newApplication("app1", "default", tt.queue, 
tt.user, tt.tags, nil, "")
+                       err = man1.PlaceApplication(app)
+                       if tt.placed == "" {
+                               assert.ErrorContains(t, err, "rejected", 
"unexpected error or no error returned")
+                       } else {
+                               assert.NilError(t, err, "unexpected placement 
failure")
+                               assert.Equal(t, tt.placed, app.GetQueuePath(), 
"incorrect queue set")
+                       }
+               })
+       }
+}
+
+func TestManagerPlaceApp_Error(t *testing.T) {
+       // Create the structure for the test
+       data := `
+partitions:
+  - name: default
+    queues:
+      - name: root
+        queues:
+          - name: testparent
+            submitacl: "*"
+            queues:
+              - name: testchild
+          - name: fixed
+            submitacl: "other-user "
+            parent: true
+`
+       err := initQueueStructure([]byte(data))
+       assert.NilError(t, err, "setting up the queue config failed")
+       // basic info without rules, manager should init
+       man := NewPlacementManager(nil, queueFunc)
+       if man == nil {
+               t.Fatal("placement manager create failed")
+       }
+       rules := []configs.PlacementRule{
+               {
+                       Name:   "user",
+                       Create: false,
+                       Parent: &configs.PlacementRule{
+                               Name:   "user",
+                               Create: false,
+                               Parent: &configs.PlacementRule{
+                                       Name:  "fixed",
+                                       Value: "testparent",
+                               },
+                       },
+               },
+       }
+       user := security.UserGroup{
+               User:   "testchild",
+               Groups: []string{},
+       }
+       tags := make(map[string]string)
+       err = man.UpdateRules(rules)
+       assert.NilError(t, err, "failed to update existing manager")
+       app := newApplication("app1", "default", "", user, tags, nil, "")
+       err = man.PlaceApplication(app)
+       queueName := app.GetQueuePath()
+       if err == nil || queueName != "" {
+               t.Errorf("failed placed app, queue: '%s', error: %v", 
queueName, err)
+       }
+}
diff --git a/pkg/scheduler/utilities_test.go b/pkg/scheduler/utilities_test.go
index e7f15359..04421ac2 100644
--- a/pkg/scheduler/utilities_test.go
+++ b/pkg/scheduler/utilities_test.go
@@ -55,6 +55,63 @@ const (
        maxapplications = "maxapplications"
 )
 
+func newBasePartitionNoRootDefault() (*PartitionContext, error) {
+       conf := configs.PartitionConfig{
+               Name: "test",
+               Queues: []configs.QueueConfig{
+                       {
+                               Name:      "root",
+                               Parent:    true,
+                               SubmitACL: "*",
+                               Queues: []configs.QueueConfig{
+                                       {
+                                               Name:   "custom",
+                                               Parent: false,
+                                               Queues: nil,
+                                               Limits: []configs.Limit{
+                                                       {
+                                                               Limit: "custom 
queue limit",
+                                                               Users: []string{
+                                                                       
"testuser",
+                                                               },
+                                                               Groups: 
[]string{
+                                                                       
"testgroup",
+                                                               },
+                                                               MaxResources: 
map[string]string{
+                                                                       
"memory": "5",
+                                                                       
"vcores": "5",
+                                                               },
+                                                               
MaxApplications: 8,
+                                                       },
+                                               },
+                                       },
+                               },
+                               Limits: []configs.Limit{
+                                       {
+                                               Limit: "root queue limit",
+                                               Users: []string{
+                                                       "testuser",
+                                               },
+                                               Groups: []string{
+                                                       "testgroup",
+                                               },
+                                               MaxResources: map[string]string{
+                                                       "memory": "10",
+                                                       "vcores": "10",
+                                               },
+                                               MaxApplications: 10,
+                                       },
+                               },
+                       },
+               },
+               PlacementRules: nil,
+               Limits:         nil,
+               NodeSortPolicy: configs.NodeSortingPolicy{},
+       }
+
+       return newPartitionContext(conf, rmID, nil)
+}
+
 func newBasePartition() (*PartitionContext, error) {
        conf := configs.PartitionConfig{
                Name: "test",


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to