This is an automated email from the ASF dual-hosted git repository.

ccondit pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git


The following commit(s) were added to refs/heads/master by this push:
     new d96cd583 [YUNIKORN-2703] Core: Fallback to default queue if no 
placement rules match
d96cd583 is described below

commit d96cd583305b18d97a70b6afa473208bf21d2334
Author: Mit Desai <[email protected]>
AuthorDate: Thu Jul 11 11:58:24 2024 -0500

    [YUNIKORN-2703] Core: Fallback to default queue if no placement rules match
    
    Closes: #904
    
    Signed-off-by: Craig Condit <[email protected]>
---
 pkg/common/constants.go                   |  15 ++--
 pkg/scheduler/partition_test.go           |  12 ++-
 pkg/scheduler/placement/placement.go      |  14 +++
 pkg/scheduler/placement/placement_test.go | 142 ++++++++++++++++++++++++++++--
 pkg/scheduler/utilities_test.go           |  57 ++++++++++++
 5 files changed, 224 insertions(+), 16 deletions(-)

diff --git a/pkg/common/constants.go b/pkg/common/constants.go
index 2b882ff7..7eae1c79 100644
--- a/pkg/common/constants.go
+++ b/pkg/common/constants.go
@@ -21,11 +21,12 @@ package common
 const (
        Empty = ""
 
-       Wildcard          = "*"
-       Separator         = ","
-       Space             = " "
-       AnonymousUser     = "nobody"
-       AnonymousGroup    = "nogroup"
-       RecoveryQueue     = "@recovery@"
-       RecoveryQueueFull = "root." + RecoveryQueue
+       Wildcard              = "*"
+       Separator             = ","
+       Space                 = " "
+       AnonymousUser         = "nobody"
+       AnonymousGroup        = "nogroup"
+       RecoveryQueue         = "@recovery@"
+       RecoveryQueueFull     = "root." + RecoveryQueue
+       DefaultPlacementQueue = "root.default"
 )
diff --git a/pkg/scheduler/partition_test.go b/pkg/scheduler/partition_test.go
index e3932187..eccbeb1e 100644
--- a/pkg/scheduler/partition_test.go
+++ b/pkg/scheduler/partition_test.go
@@ -904,7 +904,7 @@ func TestAddApp(t *testing.T) {
 }
 
 func TestAddAppForced(t *testing.T) {
-       partition, err := newBasePartition()
+       partition, err := newBasePartitionNoRootDefault()
        assert.NilError(t, err, "partition create failed")
 
        // add a new app to an invalid queue
@@ -1481,9 +1481,9 @@ func TestReAddQueues(t *testing.T) {
 }
 
 func TestGetApplication(t *testing.T) {
-       partition, err := newBasePartition()
+       partition, err := newBasePartitionNoRootDefault()
        assert.NilError(t, err, "partition create failed")
-       app := newApplication(appID1, "default", defQueue)
+       app := newApplication(appID1, "default", "root.custom")
        err = partition.AddApplication(app)
        assert.NilError(t, err, "no error expected while adding the 
application")
        assert.Equal(t, partition.GetApplication(appID1), app, "partition 
failed to add app incorrect app returned")
@@ -1495,6 +1495,12 @@ func TestGetApplication(t *testing.T) {
        if partition.GetApplication(appID2) != nil {
                t.Fatal("partition added app incorrectly should have failed")
        }
+
+       partition, err = newBasePartition()
+       assert.NilError(t, err, "partition create failed")
+       err = partition.AddApplication(app2)
+       assert.NilError(t, err, "no error expected while adding the 
application")
+       assert.Equal(t, partition.GetApplication(appID2), app2, "partition 
failed to add app incorrect app returned")
 }
 
 func TestGetQueue(t *testing.T) {
diff --git a/pkg/scheduler/placement/placement.go 
b/pkg/scheduler/placement/placement.go
index 196ccfd1..10e3e663 100644
--- a/pkg/scheduler/placement/placement.go
+++ b/pkg/scheduler/placement/placement.go
@@ -104,7 +104,9 @@ func (m *AppPlacementManager) PlaceApplication(app 
*objects.Application) error {
 
        var queueName string
        var err error
+       var remainingRules = len(m.rules)
        for _, checkRule := range m.rules {
+               remainingRules--
                log.Log(log.SchedApplication).Debug("Executing rule for placing 
application",
                        zap.String("ruleName", checkRule.getName()),
                        zap.String("application", app.ApplicationID))
@@ -116,6 +118,18 @@ func (m *AppPlacementManager) PlaceApplication(app 
*objects.Application) error {
                        app.SetQueuePath("")
                        return err
                }
+               // if no queue found even after the last rule, try to place in 
the default queue
+               if remainingRules == 0 && queueName == "" {
+                       log.Log(log.Config).Info("No rule matched, placing 
application in default queue",
+                               zap.String("application", app.ApplicationID),
+                               zap.String("defaultQueue", 
common.DefaultPlacementQueue))
+                       // get the queue object
+                       queue := m.queueFn(common.DefaultPlacementQueue)
+                       if queue != nil {
+                               // default queue exist
+                               queueName = common.DefaultPlacementQueue
+                       }
+               }
                // no queue name next rule
                if queueName == "" {
                        continue
diff --git a/pkg/scheduler/placement/placement_test.go 
b/pkg/scheduler/placement/placement_test.go
index ef05c95e..5db70118 100644
--- a/pkg/scheduler/placement/placement_test.go
+++ b/pkg/scheduler/placement/placement_test.go
@@ -328,12 +328,15 @@ partitions:
 
 func TestForcePlaceApp(t *testing.T) {
        const (
-               def  = "default"
-               defQ = "root.default"
+               provided       = "provided"
+               providedQ      = "root.provided"
+               defQ           = "root.default"
+               customDefaultQ = "root.custom"
        )
 
        // Create the structure for the test
        // specifically no acl to allow on root
+       // root.default - undefined
        data := `
 partitions:
   - name: default
@@ -341,7 +344,7 @@ partitions:
       - name: root
         submitacl: "any-user"
         queues:
-          - name: default
+          - name: provided
             submitacl: "*"
           - name: acldeny
             submitacl: " "
@@ -381,14 +384,14 @@ partitions:
                user   security.UserGroup
        }{
                {"empty", "", "", tags, user},
-               {"provided unqualified", def, defQ, tags, user},
-               {"provided qualified", defQ, defQ, tags, user},
+               {"provided unqualified", provided, providedQ, tags, user},
+               {"provided qualified", providedQ, providedQ, tags, user},
                {"provided not exist", "unknown", "", tags, user},
                {"provided parent", "root.parent", "", tags, user},
                {"acl deny", "root.acldeny", "", tags, deny},
                {"create", "unknown", "root.namespace", 
map[string]string{"namespace": "namespace"}, user},
                {"deny create", "unknown", "", map[string]string{"namespace": 
"namespace"}, deny},
-               {"forced exist", defQ, defQ, 
map[string]string{siCommon.AppTagCreateForce: "true"}, user},
+               {"forced exist", providedQ, providedQ, 
map[string]string{siCommon.AppTagCreateForce: "true"}, user},
                {"forced and create", "unknown", "root.namespace", 
map[string]string{siCommon.AppTagCreateForce: "true", "namespace": 
"namespace"}, user},
                {"forced and deny create", "unknown", common.RecoveryQueueFull, 
map[string]string{siCommon.AppTagCreateForce: "true", "namespace": 
"namespace"}, deny},
                {"forced parent", "root.parent", common.RecoveryQueueFull, 
map[string]string{siCommon.AppTagCreateForce: "true"}, user},
@@ -408,6 +411,133 @@ partitions:
                        }
                })
        }
+
+       // Update Queue structure
+       // root.default - defined
+       data = `
+partitions:
+  - name: default
+    queues:
+      - name: root
+        submitacl: "any-user"
+        queues:
+          - name: default
+            submitacl: "*"
+          - name: provided
+            submitacl: "*"
+          - name: acldeny
+            submitacl: " "
+          - name: parent
+            parent: true
+            submitacl: "*"
+`
+       err = initQueueStructure([]byte(data))
+       assert.NilError(t, err, "setting up the queue config failed")
+
+       tests = []struct {
+               name   string
+               queue  string
+               placed string
+               tags   map[string]string
+               user   security.UserGroup
+       }{
+               {"empty | defaulQ defined", "", defQ, tags, user},
+               {"provided unqualified | defaulQ defined", provided, providedQ, 
tags, user},
+               {"provided qualified | defaulQ defined", defQ, defQ, tags, 
user},
+               {"provided not exist | defaulQ defined", "unknown", defQ, tags, 
user},
+               {"provided parent | defaulQ defined", "root.parent", defQ, 
tags, user},
+               {"acl deny | defaulQ defined", "root.acldeny", defQ, tags, 
deny},
+               {"create | defaulQ defined", "unknown", "root.namespace", 
map[string]string{"namespace": "namespace"}, user},
+               {"deny create | defaulQ defined", "unknown", defQ, 
map[string]string{"namespace": "namespace"}, deny},
+               {"forced exist | defaulQ defined", defQ, defQ, 
map[string]string{siCommon.AppTagCreateForce: "true"}, user},
+               {"forced and create | defaulQ defined", "unknown", 
"root.namespace", map[string]string{siCommon.AppTagCreateForce: "true", 
"namespace": "namespace"}, user},
+               {"forced and deny create | defaulQ defined", "unknown", 
common.RecoveryQueueFull, map[string]string{siCommon.AppTagCreateForce: "true", 
"namespace": "namespace"}, deny},
+               {"forced parent | defaulQ defined", "root.parent", 
common.RecoveryQueueFull, map[string]string{siCommon.AppTagCreateForce: 
"true"}, user},
+               {"forced acl deny | defaulQ defined", "root.acldeny", 
common.RecoveryQueueFull, map[string]string{siCommon.AppTagCreateForce: 
"true"}, deny},
+               {"forced not exist | defaulQ defined", "unknown", 
common.RecoveryQueueFull, map[string]string{siCommon.AppTagCreateForce: 
"true"}, user},
+               {"forced not exist acl deny | defaulQ defined", "unknown", 
common.RecoveryQueueFull, map[string]string{siCommon.AppTagCreateForce: 
"true"}, deny},
+       }
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       app := newApplication("app1", "default", tt.queue, 
tt.user, tt.tags, nil, "")
+                       err = man.PlaceApplication(app)
+                       if tt.placed == "" {
+                               assert.Assert(t, errors.Is(err, RejectedError), 
"unexpected error or no error returned")
+                       } else {
+                               assert.NilError(t, err, "unexpected placement 
failure")
+                               assert.Equal(t, tt.placed, app.GetQueuePath(), 
"incorrect queue set")
+                       }
+               })
+       }
+
+       // initialize queues with no root.default
+       // Add fixed placement rule to define custom default queue
+       data = `
+partitions:
+  - name: default
+    queues:
+      - name: root
+        submitacl: "any-user"
+        queues:
+          - name: custom
+            submitacl: "*"
+          - name: provided
+            submitacl: "*"
+          - name: acldeny
+            submitacl: " "
+          - name: parent
+            parent: true
+            submitacl: "*"
+`
+       err = initQueueStructure([]byte(data))
+       assert.NilError(t, err, "setting up the queue config failed")
+
+       // update the manager
+       rules = []configs.PlacementRule{
+               {Name: "provided",
+                       Create: false},
+               {Name: "tag",
+                       Value:  "namespace",
+                       Create: true},
+               {Name: "fixed",
+                       Value:  "root.custom",
+                       Create: true},
+       }
+       man1 := NewPlacementManager(rules, queueFunc)
+       if man1 == nil {
+               t.Fatal("placement manager create failed")
+       }
+
+       tests = []struct {
+               name   string
+               queue  string
+               placed string
+               tags   map[string]string
+               user   security.UserGroup
+       }{
+               {"empty | custom defaulQ", "", customDefaultQ, tags, user},
+               {"provided unqualified | custom defaulQ", provided, providedQ, 
tags, user},
+               {"provided qualified | custom defaulQ", providedQ, providedQ, 
tags, user},
+               {"provided not exist | custom defaulQ", "unknown", 
customDefaultQ, tags, user},
+               {"provided parent | custom defaulQ", "root.parent", 
customDefaultQ, tags, user},
+               {"acl deny | custom defaulQ", "root.acldeny", customDefaultQ, 
tags, deny},
+               {"create | custom defaulQ", "unknown", "root.namespace", 
map[string]string{"namespace": "namespace"}, user},
+               {"deny create | custom defaulQ", "unknown", customDefaultQ, 
map[string]string{"namespace": "namespace"}, deny},
+               {"forced exist | custom defaulQ", providedQ, providedQ, 
map[string]string{siCommon.AppTagCreateForce: "true"}, user},
+               {"forced and create | custom defaulQ", "unknown", 
"root.namespace", map[string]string{siCommon.AppTagCreateForce: "true", 
"namespace": "namespace"}, user},
+       }
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       app := newApplication("app1", "default", tt.queue, 
tt.user, tt.tags, nil, "")
+                       err = man1.PlaceApplication(app)
+                       if tt.placed == "" {
+                               assert.Assert(t, errors.Is(err, RejectedError), 
"unexpected error or no error returned")
+                       } else {
+                               assert.NilError(t, err, "unexpected placement 
failure")
+                               assert.Equal(t, tt.placed, app.GetQueuePath(), 
"incorrect queue set")
+                       }
+               })
+       }
 }
 
 func TestManagerPlaceApp_Error(t *testing.T) {
diff --git a/pkg/scheduler/utilities_test.go b/pkg/scheduler/utilities_test.go
index c9fade0a..ec0f9490 100644
--- a/pkg/scheduler/utilities_test.go
+++ b/pkg/scheduler/utilities_test.go
@@ -54,6 +54,63 @@ const (
        maxapplications = "maxapplications"
 )
 
+func newBasePartitionNoRootDefault() (*PartitionContext, error) {
+       conf := configs.PartitionConfig{
+               Name: "test",
+               Queues: []configs.QueueConfig{
+                       {
+                               Name:      "root",
+                               Parent:    true,
+                               SubmitACL: "*",
+                               Queues: []configs.QueueConfig{
+                                       {
+                                               Name:   "custom",
+                                               Parent: false,
+                                               Queues: nil,
+                                               Limits: []configs.Limit{
+                                                       {
+                                                               Limit: "custom 
queue limit",
+                                                               Users: []string{
+                                                                       
"testuser",
+                                                               },
+                                                               Groups: 
[]string{
+                                                                       
"testgroup",
+                                                               },
+                                                               MaxResources: 
map[string]string{
+                                                                       
"memory": "5",
+                                                                       
"vcores": "5",
+                                                               },
+                                                               
MaxApplications: 8,
+                                                       },
+                                               },
+                                       },
+                               },
+                               Limits: []configs.Limit{
+                                       {
+                                               Limit: "root queue limit",
+                                               Users: []string{
+                                                       "testuser",
+                                               },
+                                               Groups: []string{
+                                                       "testgroup",
+                                               },
+                                               MaxResources: map[string]string{
+                                                       "memory": "10",
+                                                       "vcores": "10",
+                                               },
+                                               MaxApplications: 10,
+                                       },
+                               },
+                       },
+               },
+               PlacementRules: nil,
+               Limits:         nil,
+               NodeSortPolicy: configs.NodeSortingPolicy{},
+       }
+
+       return newPartitionContext(conf, rmID, nil)
+}
+
 func newBasePartition() (*PartitionContext, error) {
        conf := configs.PartitionConfig{
                Name: "test",


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to