This is an automated email from the ASF dual-hosted git repository.
ccondit pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git
The following commit(s) were added to refs/heads/branch-1.5 by this push:
new 5454d7d3 [YUNIKORN-2703] Core: Fallback to default queue if no
placement rules match
5454d7d3 is described below
commit 5454d7d326638e971214c40c41c9527dd9c88ee1
Author: Craig Condit <[email protected]>
AuthorDate: Thu Jul 11 12:45:09 2024 -0500
[YUNIKORN-2703] Core: Fallback to default queue if no placement rules match
---
pkg/common/constants.go | 15 +-
pkg/scheduler/partition_test.go | 12 +-
pkg/scheduler/placement/placement.go | 15 ++
pkg/scheduler/placement/placement_test.go | 268 ++++++++++++++++++++++++++++++
pkg/scheduler/utilities_test.go | 57 +++++++
5 files changed, 357 insertions(+), 10 deletions(-)
diff --git a/pkg/common/constants.go b/pkg/common/constants.go
index 2b882ff7..7eae1c79 100644
--- a/pkg/common/constants.go
+++ b/pkg/common/constants.go
@@ -21,11 +21,12 @@ package common
const (
Empty = ""
- Wildcard = "*"
- Separator = ","
- Space = " "
- AnonymousUser = "nobody"
- AnonymousGroup = "nogroup"
- RecoveryQueue = "@recovery@"
- RecoveryQueueFull = "root." + RecoveryQueue
+ Wildcard = "*"
+ Separator = ","
+ Space = " "
+ AnonymousUser = "nobody"
+ AnonymousGroup = "nogroup"
+ RecoveryQueue = "@recovery@"
+ RecoveryQueueFull = "root." + RecoveryQueue
+ DefaultPlacementQueue = "root.default"
)
diff --git a/pkg/scheduler/partition_test.go b/pkg/scheduler/partition_test.go
index bd1b5900..1b55d19d 100644
--- a/pkg/scheduler/partition_test.go
+++ b/pkg/scheduler/partition_test.go
@@ -923,7 +923,7 @@ func TestAddApp(t *testing.T) {
}
func TestAddAppForced(t *testing.T) {
- partition, err := newBasePartition()
+ partition, err := newBasePartitionNoRootDefault()
assert.NilError(t, err, "partition create failed")
// add a new app to an invalid queue
@@ -1465,9 +1465,9 @@ func TestUpdateQueues(t *testing.T) {
}
func TestGetApplication(t *testing.T) {
- partition, err := newBasePartition()
+ partition, err := newBasePartitionNoRootDefault()
assert.NilError(t, err, "partition create failed")
- app := newApplication(appID1, "default", defQueue)
+ app := newApplication(appID1, "default", "root.custom")
err = partition.AddApplication(app)
assert.NilError(t, err, "no error expected while adding the
application")
assert.Equal(t, partition.GetApplication(appID1), app, "partition
failed to add app incorrect app returned")
@@ -1479,6 +1479,12 @@ func TestGetApplication(t *testing.T) {
if partition.GetApplication(appID2) != nil {
t.Fatal("partition added app incorrectly should have failed")
}
+
+ partition, err = newBasePartition()
+ assert.NilError(t, err, "partition create failed")
+ err = partition.AddApplication(app2)
+ assert.NilError(t, err, "no error expected while adding the
application")
+ assert.Equal(t, partition.GetApplication(appID2), app2, "partition
failed to add app incorrect app returned")
}
func TestGetQueue(t *testing.T) {
diff --git a/pkg/scheduler/placement/placement.go
b/pkg/scheduler/placement/placement.go
index 08aac0c4..1d16ec70 100644
--- a/pkg/scheduler/placement/placement.go
+++ b/pkg/scheduler/placement/placement.go
@@ -24,6 +24,7 @@ import (
"go.uber.org/zap"
+ "github.com/apache/yunikorn-core/pkg/common"
"github.com/apache/yunikorn-core/pkg/common/configs"
"github.com/apache/yunikorn-core/pkg/locking"
"github.com/apache/yunikorn-core/pkg/log"
@@ -115,7 +116,9 @@ func (m *AppPlacementManager) PlaceApplication(app
*objects.Application) error {
var queueName string
var aclCheck bool
var err error
+ var remainingRules = len(m.rules)
for _, checkRule := range m.rules {
+ remainingRules--
log.Log(log.Config).Debug("Executing rule for placing
application",
zap.String("ruleName", checkRule.getName()),
zap.String("application", app.ApplicationID))
@@ -127,6 +130,18 @@ func (m *AppPlacementManager) PlaceApplication(app
*objects.Application) error {
app.SetQueuePath("")
return err
}
+ // if no queue found even after the last rule, try to place in
the default queue
+ if remainingRules == 0 && queueName == "" {
+ log.Log(log.Config).Info("No rule matched, placing
application in default queue",
+ zap.String("application", app.ApplicationID),
+ zap.String("defaultQueue",
common.DefaultPlacementQueue))
+ // get the queue object
+ queue := m.queueFn(common.DefaultPlacementQueue)
+ if queue != nil {
+ // default queue exist
+ queueName = common.DefaultPlacementQueue
+ }
+ }
// queueName returned make sure ACL allows access and create
the queueName if not exist
if queueName != "" {
// get the queue object
diff --git a/pkg/scheduler/placement/placement_test.go
b/pkg/scheduler/placement/placement_test.go
index 947fb94f..ed4d0e3b 100644
--- a/pkg/scheduler/placement/placement_test.go
+++ b/pkg/scheduler/placement/placement_test.go
@@ -23,9 +23,11 @@ import (
"gotest.tools/v3/assert"
+ "github.com/apache/yunikorn-core/pkg/common"
"github.com/apache/yunikorn-core/pkg/common/configs"
"github.com/apache/yunikorn-core/pkg/common/security"
"github.com/apache/yunikorn-core/pkg/scheduler/placement/types"
+ siCommon "github.com/apache/yunikorn-scheduler-interface/lib/go/common"
)
// basic test to check if no rules leave the manager unusable
@@ -256,3 +258,269 @@ partitions:
t.Errorf("parent queue: app should not have been placed, queue:
'%s', error: %v", queueName, err)
}
}
+
+func TestForcePlaceApp(t *testing.T) {
+ const (
+ provided = "provided"
+ providedQ = "root.provided"
+ defQ = "root.default"
+ customDefaultQ = "root.custom"
+ )
+
+ // Create the structure for the test
+ // specifically no acl to allow on root
+ // root.default - undefined
+ data := `
+partitions:
+ - name: default
+ queues:
+ - name: root
+ submitacl: "any-user"
+ queues:
+ - name: provided
+ submitacl: "*"
+ - name: acldeny
+ submitacl: " "
+ - name: parent
+ parent: true
+ submitacl: "*"
+`
+ err := initQueueStructure([]byte(data))
+ assert.NilError(t, err, "setting up the queue config failed")
+ // update the manager
+ rules := []configs.PlacementRule{
+ {Name: "provided",
+ Create: false},
+ {Name: "tag",
+ Value: "namespace",
+ Create: true},
+ }
+ man := NewPlacementManager(rules, queueFunc)
+ if man == nil {
+ t.Fatal("placement manager create failed")
+ }
+
+ tags := make(map[string]string)
+ user := security.UserGroup{
+ User: "any-user",
+ Groups: []string{},
+ }
+ deny := security.UserGroup{
+ User: "deny-user",
+ Groups: []string{},
+ }
+ var tests = []struct {
+ name string
+ queue string
+ placed string
+ tags map[string]string
+ user security.UserGroup
+ }{
+ {"empty", "", "", tags, user},
+ {"provided unqualified", provided, providedQ, tags, user},
+ {"provided qualified", providedQ, providedQ, tags, user},
+ {"provided not exist", "unknown", "", tags, user},
+ {"provided parent", "root.parent", "", tags, user},
+ {"acl deny", "root.acldeny", "", tags, deny},
+ {"create", "unknown", "root.namespace",
map[string]string{"namespace": "namespace"}, user},
+ {"deny create", "unknown", "", map[string]string{"namespace":
"namespace"}, deny},
+ {"forced exist", providedQ, providedQ,
map[string]string{siCommon.AppTagCreateForce: "true"}, user},
+ {"forced and create", "unknown", "root.namespace",
map[string]string{siCommon.AppTagCreateForce: "true", "namespace":
"namespace"}, user},
+ {"forced and deny create", "unknown", common.RecoveryQueueFull,
map[string]string{siCommon.AppTagCreateForce: "true", "namespace":
"namespace"}, deny},
+ {"forced parent", "root.parent", common.RecoveryQueueFull,
map[string]string{siCommon.AppTagCreateForce: "true"}, user},
+ {"forced acl deny", "root.acldeny", common.RecoveryQueueFull,
map[string]string{siCommon.AppTagCreateForce: "true"}, deny},
+ {"forced not exist", "unknown", common.RecoveryQueueFull,
map[string]string{siCommon.AppTagCreateForce: "true"}, user},
+ {"forced not exist acl deny", "unknown",
common.RecoveryQueueFull, map[string]string{siCommon.AppTagCreateForce:
"true"}, deny},
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ app := newApplication("app1", "default", tt.queue,
tt.user, tt.tags, nil, "")
+ err = man.PlaceApplication(app)
+ if tt.placed == "" {
+ assert.ErrorContains(t, err, "rejected",
"unexpected error or no error returned")
+ } else {
+ assert.NilError(t, err, "unexpected placement
failure")
+ assert.Equal(t, tt.placed, app.GetQueuePath(),
"incorrect queue set")
+ }
+ })
+ }
+
+ // Update Queue structure
+ // root.default - defined
+ data = `
+partitions:
+ - name: default
+ queues:
+ - name: root
+ submitacl: "any-user"
+ queues:
+ - name: default
+ submitacl: "*"
+ - name: provided
+ submitacl: "*"
+ - name: acldeny
+ submitacl: " "
+ - name: parent
+ parent: true
+ submitacl: "*"
+`
+ err = initQueueStructure([]byte(data))
+ assert.NilError(t, err, "setting up the queue config failed")
+
+ tests = []struct {
+ name string
+ queue string
+ placed string
+ tags map[string]string
+ user security.UserGroup
+ }{
+ {"empty | defaulQ defined", "", defQ, tags, user},
+ {"provided unqualified | defaulQ defined", provided, providedQ,
tags, user},
+ {"provided qualified | defaulQ defined", defQ, defQ, tags,
user},
+ {"provided not exist | defaulQ defined", "unknown", defQ, tags,
user},
+ {"provided parent | defaulQ defined", "root.parent", defQ,
tags, user},
+ {"acl deny | defaulQ defined", "root.acldeny", defQ, tags,
deny},
+ {"create | defaulQ defined", "unknown", "root.namespace",
map[string]string{"namespace": "namespace"}, user},
+ {"deny create | defaulQ defined", "unknown", defQ,
map[string]string{"namespace": "namespace"}, deny},
+ {"forced exist | defaulQ defined", defQ, defQ,
map[string]string{siCommon.AppTagCreateForce: "true"}, user},
+ {"forced and create | defaulQ defined", "unknown",
"root.namespace", map[string]string{siCommon.AppTagCreateForce: "true",
"namespace": "namespace"}, user},
+ {"forced and deny create | defaulQ defined", "unknown",
common.RecoveryQueueFull, map[string]string{siCommon.AppTagCreateForce: "true",
"namespace": "namespace"}, deny},
+ {"forced parent | defaulQ defined", "root.parent",
common.RecoveryQueueFull, map[string]string{siCommon.AppTagCreateForce:
"true"}, user},
+ {"forced acl deny | defaulQ defined", "root.acldeny",
common.RecoveryQueueFull, map[string]string{siCommon.AppTagCreateForce:
"true"}, deny},
+ {"forced not exist | defaulQ defined", "unknown",
common.RecoveryQueueFull, map[string]string{siCommon.AppTagCreateForce:
"true"}, user},
+ {"forced not exist acl deny | defaulQ defined", "unknown",
common.RecoveryQueueFull, map[string]string{siCommon.AppTagCreateForce:
"true"}, deny},
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ app := newApplication("app1", "default", tt.queue,
tt.user, tt.tags, nil, "")
+ err = man.PlaceApplication(app)
+ if tt.placed == "" {
+ assert.ErrorContains(t, err, "rejected",
"unexpected error or no error returned")
+ } else {
+ assert.NilError(t, err, "unexpected placement
failure")
+ assert.Equal(t, tt.placed, app.GetQueuePath(),
"incorrect queue set")
+ }
+ })
+ }
+
+ // initialize queues with no root.default
+ // Add fixed placement rule to define custom default queue
+ data = `
+partitions:
+ - name: default
+ queues:
+ - name: root
+ submitacl: "any-user"
+ queues:
+ - name: custom
+ submitacl: "*"
+ - name: provided
+ submitacl: "*"
+ - name: acldeny
+ submitacl: " "
+ - name: parent
+ parent: true
+ submitacl: "*"
+`
+ err = initQueueStructure([]byte(data))
+ assert.NilError(t, err, "setting up the queue config failed")
+
+ // update the manager
+ rules = []configs.PlacementRule{
+ {Name: "provided",
+ Create: false},
+ {Name: "tag",
+ Value: "namespace",
+ Create: true},
+ {Name: "fixed",
+ Value: "root.custom",
+ Create: true},
+ }
+ man1 := NewPlacementManager(rules, queueFunc)
+ if man1 == nil {
+ t.Fatal("placement manager create failed")
+ }
+
+ tests = []struct {
+ name string
+ queue string
+ placed string
+ tags map[string]string
+ user security.UserGroup
+ }{
+ {"empty | custom defaulQ", "", customDefaultQ, tags, user},
+ {"provided unqualified | custom defaulQ", provided, providedQ,
tags, user},
+ {"provided qualified | custom defaulQ", providedQ, providedQ,
tags, user},
+ {"provided not exist | custom defaulQ", "unknown",
customDefaultQ, tags, user},
+ {"provided parent | custom defaulQ", "root.parent",
customDefaultQ, tags, user},
+ {"acl deny | custom defaulQ", "root.acldeny", customDefaultQ,
tags, deny},
+ {"create | custom defaulQ", "unknown", "root.namespace",
map[string]string{"namespace": "namespace"}, user},
+ {"deny create | custom defaulQ", "unknown", customDefaultQ,
map[string]string{"namespace": "namespace"}, deny},
+ {"forced exist | custom defaulQ", providedQ, providedQ,
map[string]string{siCommon.AppTagCreateForce: "true"}, user},
+ {"forced and create | custom defaulQ", "unknown",
"root.namespace", map[string]string{siCommon.AppTagCreateForce: "true",
"namespace": "namespace"}, user},
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ app := newApplication("app1", "default", tt.queue,
tt.user, tt.tags, nil, "")
+ err = man1.PlaceApplication(app)
+ if tt.placed == "" {
+ assert.ErrorContains(t, err, "rejected",
"unexpected error or no error returned")
+ } else {
+ assert.NilError(t, err, "unexpected placement
failure")
+ assert.Equal(t, tt.placed, app.GetQueuePath(),
"incorrect queue set")
+ }
+ })
+ }
+}
+
+func TestManagerPlaceApp_Error(t *testing.T) {
+ // Create the structure for the test
+ data := `
+partitions:
+ - name: default
+ queues:
+ - name: root
+ queues:
+ - name: testparent
+ submitacl: "*"
+ queues:
+ - name: testchild
+ - name: fixed
+ submitacl: "other-user "
+ parent: true
+`
+ err := initQueueStructure([]byte(data))
+ assert.NilError(t, err, "setting up the queue config failed")
+ // basic info without rules, manager should init
+ man := NewPlacementManager(nil, queueFunc)
+ if man == nil {
+ t.Fatal("placement manager create failed")
+ }
+ rules := []configs.PlacementRule{
+ {
+ Name: "user",
+ Create: false,
+ Parent: &configs.PlacementRule{
+ Name: "user",
+ Create: false,
+ Parent: &configs.PlacementRule{
+ Name: "fixed",
+ Value: "testparent",
+ },
+ },
+ },
+ }
+ user := security.UserGroup{
+ User: "testchild",
+ Groups: []string{},
+ }
+ tags := make(map[string]string)
+ err = man.UpdateRules(rules)
+ assert.NilError(t, err, "failed to update existing manager")
+ app := newApplication("app1", "default", "", user, tags, nil, "")
+ err = man.PlaceApplication(app)
+ queueName := app.GetQueuePath()
+ if err == nil || queueName != "" {
+ t.Errorf("failed placed app, queue: '%s', error: %v",
queueName, err)
+ }
+}
diff --git a/pkg/scheduler/utilities_test.go b/pkg/scheduler/utilities_test.go
index e7f15359..04421ac2 100644
--- a/pkg/scheduler/utilities_test.go
+++ b/pkg/scheduler/utilities_test.go
@@ -55,6 +55,63 @@ const (
maxapplications = "maxapplications"
)
+func newBasePartitionNoRootDefault() (*PartitionContext, error) {
+ conf := configs.PartitionConfig{
+ Name: "test",
+ Queues: []configs.QueueConfig{
+ {
+ Name: "root",
+ Parent: true,
+ SubmitACL: "*",
+ Queues: []configs.QueueConfig{
+ {
+ Name: "custom",
+ Parent: false,
+ Queues: nil,
+ Limits: []configs.Limit{
+ {
+ Limit: "custom
queue limit",
+ Users: []string{
+
"testuser",
+ },
+ Groups:
[]string{
+
"testgroup",
+ },
+ MaxResources:
map[string]string{
+
"memory": "5",
+
"vcores": "5",
+ },
+
MaxApplications: 8,
+ },
+ },
+ },
+ },
+ Limits: []configs.Limit{
+ {
+ Limit: "root queue limit",
+ Users: []string{
+ "testuser",
+ },
+ Groups: []string{
+ "testgroup",
+ },
+ MaxResources: map[string]string{
+ "memory": "10",
+ "vcores": "10",
+ },
+ MaxApplications: 10,
+ },
+ },
+ },
+ },
+ PlacementRules: nil,
+ Limits: nil,
+ NodeSortPolicy: configs.NodeSortingPolicy{},
+ }
+
+ return newPartitionContext(conf, rmID, nil)
+}
+
func newBasePartition() (*PartitionContext, error) {
conf := configs.PartitionConfig{
Name: "test",
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]