This is an automated email from the ASF dual-hosted git repository.
ccondit pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git
The following commit(s) were added to refs/heads/master by this push:
new d96cd583 [YUNIKORN-2703] Core: Fallback to default queue if no
placement rules match
d96cd583 is described below
commit d96cd583305b18d97a70b6afa473208bf21d2334
Author: Mit Desai <[email protected]>
AuthorDate: Thu Jul 11 11:58:24 2024 -0500
[YUNIKORN-2703] Core: Fallback to default queue if no placement rules match
Closes: #904
Signed-off-by: Craig Condit <[email protected]>
---
pkg/common/constants.go | 15 ++--
pkg/scheduler/partition_test.go | 12 ++-
pkg/scheduler/placement/placement.go | 14 +++
pkg/scheduler/placement/placement_test.go | 142 ++++++++++++++++++++++++++++--
pkg/scheduler/utilities_test.go | 57 ++++++++++++
5 files changed, 224 insertions(+), 16 deletions(-)
diff --git a/pkg/common/constants.go b/pkg/common/constants.go
index 2b882ff7..7eae1c79 100644
--- a/pkg/common/constants.go
+++ b/pkg/common/constants.go
@@ -21,11 +21,12 @@ package common
const (
Empty = ""
- Wildcard = "*"
- Separator = ","
- Space = " "
- AnonymousUser = "nobody"
- AnonymousGroup = "nogroup"
- RecoveryQueue = "@recovery@"
- RecoveryQueueFull = "root." + RecoveryQueue
+ Wildcard = "*"
+ Separator = ","
+ Space = " "
+ AnonymousUser = "nobody"
+ AnonymousGroup = "nogroup"
+ RecoveryQueue = "@recovery@"
+ RecoveryQueueFull = "root." + RecoveryQueue
+ DefaultPlacementQueue = "root.default"
)
diff --git a/pkg/scheduler/partition_test.go b/pkg/scheduler/partition_test.go
index e3932187..eccbeb1e 100644
--- a/pkg/scheduler/partition_test.go
+++ b/pkg/scheduler/partition_test.go
@@ -904,7 +904,7 @@ func TestAddApp(t *testing.T) {
}
func TestAddAppForced(t *testing.T) {
- partition, err := newBasePartition()
+ partition, err := newBasePartitionNoRootDefault()
assert.NilError(t, err, "partition create failed")
// add a new app to an invalid queue
@@ -1481,9 +1481,9 @@ func TestReAddQueues(t *testing.T) {
}
func TestGetApplication(t *testing.T) {
- partition, err := newBasePartition()
+ partition, err := newBasePartitionNoRootDefault()
assert.NilError(t, err, "partition create failed")
- app := newApplication(appID1, "default", defQueue)
+ app := newApplication(appID1, "default", "root.custom")
err = partition.AddApplication(app)
assert.NilError(t, err, "no error expected while adding the
application")
assert.Equal(t, partition.GetApplication(appID1), app, "partition
failed to add app incorrect app returned")
@@ -1495,6 +1495,12 @@ func TestGetApplication(t *testing.T) {
if partition.GetApplication(appID2) != nil {
t.Fatal("partition added app incorrectly should have failed")
}
+
+ partition, err = newBasePartition()
+ assert.NilError(t, err, "partition create failed")
+ err = partition.AddApplication(app2)
+ assert.NilError(t, err, "no error expected while adding the
application")
+ assert.Equal(t, partition.GetApplication(appID2), app2, "partition
failed to add app incorrect app returned")
}
func TestGetQueue(t *testing.T) {
diff --git a/pkg/scheduler/placement/placement.go
b/pkg/scheduler/placement/placement.go
index 196ccfd1..10e3e663 100644
--- a/pkg/scheduler/placement/placement.go
+++ b/pkg/scheduler/placement/placement.go
@@ -104,7 +104,9 @@ func (m *AppPlacementManager) PlaceApplication(app
*objects.Application) error {
var queueName string
var err error
+ var remainingRules = len(m.rules)
for _, checkRule := range m.rules {
+ remainingRules--
log.Log(log.SchedApplication).Debug("Executing rule for placing
application",
zap.String("ruleName", checkRule.getName()),
zap.String("application", app.ApplicationID))
@@ -116,6 +118,18 @@ func (m *AppPlacementManager) PlaceApplication(app
*objects.Application) error {
app.SetQueuePath("")
return err
}
+ // if no queue found even after the last rule, try to place in
the default queue
+ if remainingRules == 0 && queueName == "" {
+ log.Log(log.Config).Info("No rule matched, placing
application in default queue",
+ zap.String("application", app.ApplicationID),
+ zap.String("defaultQueue",
common.DefaultPlacementQueue))
+ // get the queue object
+ queue := m.queueFn(common.DefaultPlacementQueue)
+ if queue != nil {
+ // default queue exist
+ queueName = common.DefaultPlacementQueue
+ }
+ }
// no queue name next rule
if queueName == "" {
continue
diff --git a/pkg/scheduler/placement/placement_test.go
b/pkg/scheduler/placement/placement_test.go
index ef05c95e..5db70118 100644
--- a/pkg/scheduler/placement/placement_test.go
+++ b/pkg/scheduler/placement/placement_test.go
@@ -328,12 +328,15 @@ partitions:
func TestForcePlaceApp(t *testing.T) {
const (
- def = "default"
- defQ = "root.default"
+ provided = "provided"
+ providedQ = "root.provided"
+ defQ = "root.default"
+ customDefaultQ = "root.custom"
)
// Create the structure for the test
// specifically no acl to allow on root
+ // root.default - undefined
data := `
partitions:
- name: default
@@ -341,7 +344,7 @@ partitions:
- name: root
submitacl: "any-user"
queues:
- - name: default
+ - name: provided
submitacl: "*"
- name: acldeny
submitacl: " "
@@ -381,14 +384,14 @@ partitions:
user security.UserGroup
}{
{"empty", "", "", tags, user},
- {"provided unqualified", def, defQ, tags, user},
- {"provided qualified", defQ, defQ, tags, user},
+ {"provided unqualified", provided, providedQ, tags, user},
+ {"provided qualified", providedQ, providedQ, tags, user},
{"provided not exist", "unknown", "", tags, user},
{"provided parent", "root.parent", "", tags, user},
{"acl deny", "root.acldeny", "", tags, deny},
{"create", "unknown", "root.namespace",
map[string]string{"namespace": "namespace"}, user},
{"deny create", "unknown", "", map[string]string{"namespace":
"namespace"}, deny},
- {"forced exist", defQ, defQ,
map[string]string{siCommon.AppTagCreateForce: "true"}, user},
+ {"forced exist", providedQ, providedQ,
map[string]string{siCommon.AppTagCreateForce: "true"}, user},
{"forced and create", "unknown", "root.namespace",
map[string]string{siCommon.AppTagCreateForce: "true", "namespace":
"namespace"}, user},
{"forced and deny create", "unknown", common.RecoveryQueueFull,
map[string]string{siCommon.AppTagCreateForce: "true", "namespace":
"namespace"}, deny},
{"forced parent", "root.parent", common.RecoveryQueueFull,
map[string]string{siCommon.AppTagCreateForce: "true"}, user},
@@ -408,6 +411,133 @@ partitions:
}
})
}
+
+ // Update Queue structure
+ // root.default - defined
+ data = `
+partitions:
+ - name: default
+ queues:
+ - name: root
+ submitacl: "any-user"
+ queues:
+ - name: default
+ submitacl: "*"
+ - name: provided
+ submitacl: "*"
+ - name: acldeny
+ submitacl: " "
+ - name: parent
+ parent: true
+ submitacl: "*"
+`
+ err = initQueueStructure([]byte(data))
+ assert.NilError(t, err, "setting up the queue config failed")
+
+ tests = []struct {
+ name string
+ queue string
+ placed string
+ tags map[string]string
+ user security.UserGroup
+ }{
+ {"empty | defaulQ defined", "", defQ, tags, user},
+ {"provided unqualified | defaulQ defined", provided, providedQ,
tags, user},
+ {"provided qualified | defaulQ defined", defQ, defQ, tags,
user},
+ {"provided not exist | defaulQ defined", "unknown", defQ, tags,
user},
+ {"provided parent | defaulQ defined", "root.parent", defQ,
tags, user},
+ {"acl deny | defaulQ defined", "root.acldeny", defQ, tags,
deny},
+ {"create | defaulQ defined", "unknown", "root.namespace",
map[string]string{"namespace": "namespace"}, user},
+ {"deny create | defaulQ defined", "unknown", defQ,
map[string]string{"namespace": "namespace"}, deny},
+ {"forced exist | defaulQ defined", defQ, defQ,
map[string]string{siCommon.AppTagCreateForce: "true"}, user},
+ {"forced and create | defaulQ defined", "unknown",
"root.namespace", map[string]string{siCommon.AppTagCreateForce: "true",
"namespace": "namespace"}, user},
+ {"forced and deny create | defaulQ defined", "unknown",
common.RecoveryQueueFull, map[string]string{siCommon.AppTagCreateForce: "true",
"namespace": "namespace"}, deny},
+ {"forced parent | defaulQ defined", "root.parent",
common.RecoveryQueueFull, map[string]string{siCommon.AppTagCreateForce:
"true"}, user},
+ {"forced acl deny | defaulQ defined", "root.acldeny",
common.RecoveryQueueFull, map[string]string{siCommon.AppTagCreateForce:
"true"}, deny},
+ {"forced not exist | defaulQ defined", "unknown",
common.RecoveryQueueFull, map[string]string{siCommon.AppTagCreateForce:
"true"}, user},
+ {"forced not exist acl deny | defaulQ defined", "unknown",
common.RecoveryQueueFull, map[string]string{siCommon.AppTagCreateForce:
"true"}, deny},
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ app := newApplication("app1", "default", tt.queue,
tt.user, tt.tags, nil, "")
+ err = man.PlaceApplication(app)
+ if tt.placed == "" {
+ assert.Assert(t, errors.Is(err, RejectedError),
"unexpected error or no error returned")
+ } else {
+ assert.NilError(t, err, "unexpected placement
failure")
+ assert.Equal(t, tt.placed, app.GetQueuePath(),
"incorrect queue set")
+ }
+ })
+ }
+
+ // initialize queues with no root.default
+ // Add fixed placement rule to define custom default queue
+ data = `
+partitions:
+ - name: default
+ queues:
+ - name: root
+ submitacl: "any-user"
+ queues:
+ - name: custom
+ submitacl: "*"
+ - name: provided
+ submitacl: "*"
+ - name: acldeny
+ submitacl: " "
+ - name: parent
+ parent: true
+ submitacl: "*"
+`
+ err = initQueueStructure([]byte(data))
+ assert.NilError(t, err, "setting up the queue config failed")
+
+ // update the manager
+ rules = []configs.PlacementRule{
+ {Name: "provided",
+ Create: false},
+ {Name: "tag",
+ Value: "namespace",
+ Create: true},
+ {Name: "fixed",
+ Value: "root.custom",
+ Create: true},
+ }
+ man1 := NewPlacementManager(rules, queueFunc)
+ if man1 == nil {
+ t.Fatal("placement manager create failed")
+ }
+
+ tests = []struct {
+ name string
+ queue string
+ placed string
+ tags map[string]string
+ user security.UserGroup
+ }{
+ {"empty | custom defaulQ", "", customDefaultQ, tags, user},
+ {"provided unqualified | custom defaulQ", provided, providedQ,
tags, user},
+ {"provided qualified | custom defaulQ", providedQ, providedQ,
tags, user},
+ {"provided not exist | custom defaulQ", "unknown",
customDefaultQ, tags, user},
+ {"provided parent | custom defaulQ", "root.parent",
customDefaultQ, tags, user},
+ {"acl deny | custom defaulQ", "root.acldeny", customDefaultQ,
tags, deny},
+ {"create | custom defaulQ", "unknown", "root.namespace",
map[string]string{"namespace": "namespace"}, user},
+ {"deny create | custom defaulQ", "unknown", customDefaultQ,
map[string]string{"namespace": "namespace"}, deny},
+ {"forced exist | custom defaulQ", providedQ, providedQ,
map[string]string{siCommon.AppTagCreateForce: "true"}, user},
+ {"forced and create | custom defaulQ", "unknown",
"root.namespace", map[string]string{siCommon.AppTagCreateForce: "true",
"namespace": "namespace"}, user},
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ app := newApplication("app1", "default", tt.queue,
tt.user, tt.tags, nil, "")
+ err = man1.PlaceApplication(app)
+ if tt.placed == "" {
+ assert.Assert(t, errors.Is(err, RejectedError),
"unexpected error or no error returned")
+ } else {
+ assert.NilError(t, err, "unexpected placement
failure")
+ assert.Equal(t, tt.placed, app.GetQueuePath(),
"incorrect queue set")
+ }
+ })
+ }
}
func TestManagerPlaceApp_Error(t *testing.T) {
diff --git a/pkg/scheduler/utilities_test.go b/pkg/scheduler/utilities_test.go
index c9fade0a..ec0f9490 100644
--- a/pkg/scheduler/utilities_test.go
+++ b/pkg/scheduler/utilities_test.go
@@ -54,6 +54,63 @@ const (
maxapplications = "maxapplications"
)
+func newBasePartitionNoRootDefault() (*PartitionContext, error) {
+ conf := configs.PartitionConfig{
+ Name: "test",
+ Queues: []configs.QueueConfig{
+ {
+ Name: "root",
+ Parent: true,
+ SubmitACL: "*",
+ Queues: []configs.QueueConfig{
+ {
+ Name: "custom",
+ Parent: false,
+ Queues: nil,
+ Limits: []configs.Limit{
+ {
+ Limit: "custom
queue limit",
+ Users: []string{
+
"testuser",
+ },
+ Groups:
[]string{
+
"testgroup",
+ },
+ MaxResources:
map[string]string{
+
"memory": "5",
+
"vcores": "5",
+ },
+
MaxApplications: 8,
+ },
+ },
+ },
+ },
+ Limits: []configs.Limit{
+ {
+ Limit: "root queue limit",
+ Users: []string{
+ "testuser",
+ },
+ Groups: []string{
+ "testgroup",
+ },
+ MaxResources: map[string]string{
+ "memory": "10",
+ "vcores": "10",
+ },
+ MaxApplications: 10,
+ },
+ },
+ },
+ },
+ PlacementRules: nil,
+ Limits: nil,
+ NodeSortPolicy: configs.NodeSortingPolicy{},
+ }
+
+ return newPartitionContext(conf, rmID, nil)
+}
+
func newBasePartition() (*PartitionContext, error) {
conf := configs.PartitionConfig{
Name: "test",
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]