This is an automated email from the ASF dual-hosted git repository.
ccondit pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git
The following commit(s) were added to refs/heads/master by this push:
new 5a40def4 [YUNIKORN-1793] Handle placement rule and queue changes
during initialisation (#601)
5a40def4 is described below
commit 5a40def426f1c0fc1073012637b3ea2fc5ecfdd4
Author: Craig Condit <[email protected]>
AuthorDate: Wed Aug 30 10:58:26 2023 -0500
[YUNIKORN-1793] Handle placement rule and queue changes during
initialisation (#601)
Adds a tag to mark applications as forced create. This allows most
validation to be
suppressed and if necessary, will redirect the app to a recovery queue.
Closes: #601
---
pkg/common/constants.go | 10 ++-
pkg/common/security/usergroup.go | 18 +++-
pkg/common/security/usergroup_test.go | 14 ++-
pkg/common/utils.go | 23 +++++
pkg/common/utils_test.go | 12 +++
pkg/scheduler/context.go | 2 +-
pkg/scheduler/objects/application.go | 23 ++---
pkg/scheduler/objects/application_test.go | 17 ++++
pkg/scheduler/objects/queue.go | 42 +++++++--
pkg/scheduler/objects/queue_test.go | 34 +++++++
pkg/scheduler/partition.go | 73 ++++++++-------
pkg/scheduler/partition_test.go | 122 ++++++++++++++++++++++++--
pkg/scheduler/placement/fixed_rule.go | 17 ++--
pkg/scheduler/placement/fixed_rule_test.go | 29 ++++--
pkg/scheduler/placement/placement.go | 74 ++++++----------
pkg/scheduler/placement/placement_test.go | 101 ++++++++-------------
pkg/scheduler/placement/provided_rule.go | 20 +++--
pkg/scheduler/placement/provided_rule_test.go | 32 ++++---
pkg/scheduler/placement/recovery_rule.go | 58 ++++++++++++
pkg/scheduler/placement/recovery_rule_test.go | 80 +++++++++++++++++
pkg/scheduler/placement/rule.go | 3 +-
pkg/scheduler/placement/rule_test.go | 13 ++-
pkg/scheduler/placement/tag_rule.go | 19 ++--
pkg/scheduler/placement/tag_rule_test.go | 42 ++++++---
pkg/scheduler/placement/testrule.go | 8 +-
pkg/scheduler/placement/types/types.go | 1 +
pkg/scheduler/placement/user_rule.go | 17 ++--
pkg/scheduler/placement/user_rule_test.go | 29 ++++--
pkg/scheduler/utilities_test.go | 5 ++
29 files changed, 670 insertions(+), 268 deletions(-)
diff --git a/pkg/common/constants.go b/pkg/common/constants.go
index b0b3a584..2b882ff7 100644
--- a/pkg/common/constants.go
+++ b/pkg/common/constants.go
@@ -21,7 +21,11 @@ package common
const (
Empty = ""
- Wildcard = "*"
- Separator = ","
- Space = " "
+ Wildcard = "*"
+ Separator = ","
+ Space = " "
+ AnonymousUser = "nobody"
+ AnonymousGroup = "nogroup"
+ RecoveryQueue = "@recovery@"
+ RecoveryQueueFull = "root." + RecoveryQueue
)
diff --git a/pkg/common/security/usergroup.go b/pkg/common/security/usergroup.go
index e381204d..124521bd 100644
--- a/pkg/common/security/usergroup.go
+++ b/pkg/common/security/usergroup.go
@@ -26,6 +26,7 @@ import (
"go.uber.org/zap"
+ "github.com/apache/yunikorn-core/pkg/common"
"github.com/apache/yunikorn-core/pkg/log"
"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
)
@@ -122,14 +123,25 @@ func (c *UserGroupCache) resetCache() {
c.ugs = make(map[string]*UserGroup)
}
-func (c *UserGroupCache) ConvertUGI(ugi *si.UserGroupInformation) (UserGroup,
error) {
+func (c *UserGroupCache) ConvertUGI(ugi *si.UserGroupInformation, force bool)
(UserGroup, error) {
// check if we have a user to convert
if ugi == nil || ugi.User == "" {
- return UserGroup{}, fmt.Errorf("empty user cannot resolve")
+ if force {
+ // app creation is forced, so we need to synthesize a
user / group
+ ugi.User = common.AnonymousUser
+ ugi.Groups = []string{common.AnonymousGroup}
+ } else {
+ return UserGroup{}, fmt.Errorf("empty user cannot
resolve")
+ }
}
// try to resolve the user if group info is empty otherwise we just
convert
if len(ugi.Groups) == 0 {
- return c.GetUserGroup(ugi.User)
+ ug, err := c.GetUserGroup(ugi.User)
+ if force && (err != nil || ug.failed) {
+ ugi.Groups = []string{common.AnonymousGroup}
+ } else {
+ return ug, err
+ }
}
// If groups are already present we should just convert
newUG := UserGroup{User: ugi.User}
diff --git a/pkg/common/security/usergroup_test.go
b/pkg/common/security/usergroup_test.go
index 279dadf2..9eef2c6e 100644
--- a/pkg/common/security/usergroup_test.go
+++ b/pkg/common/security/usergroup_test.go
@@ -198,13 +198,13 @@ func TestConvertUGI(t *testing.T) {
User: "",
Groups: nil,
}
- ug, err := testCache.ConvertUGI(ugi)
+ ug, err := testCache.ConvertUGI(ugi, false)
if err == nil {
t.Errorf("empty user convert should have failed and did not:
%v", ug)
}
// try known user without groups
ugi.User = "testuser1"
- ug, err = testCache.ConvertUGI(ugi)
+ ug, err = testCache.ConvertUGI(ugi, false)
if err != nil {
t.Errorf("known user, no groups, convert should not have
failed: %v", err)
}
@@ -213,15 +213,21 @@ func TestConvertUGI(t *testing.T) {
}
// try unknown user without groups
ugi.User = "unknown"
- ug, err = testCache.ConvertUGI(ugi)
+ ug, err = testCache.ConvertUGI(ugi, false)
if err == nil {
t.Errorf("unknown user, no groups, convert should have failed:
%v", ug)
}
+ // try empty user when forced
+ ugi.User = ""
+ ug, err = testCache.ConvertUGI(ugi, true)
+ if err != nil {
+ t.Errorf("empty user but forced, convert should not have
failed: %v", err)
+ }
// try unknown user with groups
ugi.User = "unknown2"
group := "passedin"
ugi.Groups = []string{group}
- ug, err = testCache.ConvertUGI(ugi)
+ ug, err = testCache.ConvertUGI(ugi, false)
if err != nil {
t.Errorf("unknown user with groups, convert should not have
failed: %v", err)
}
diff --git a/pkg/common/utils.go b/pkg/common/utils.go
index c8437a8d..bb4bd330 100644
--- a/pkg/common/utils.go
+++ b/pkg/common/utils.go
@@ -191,6 +191,29 @@ func IsAllowPreemptOther(policy *si.PreemptionPolicy) bool
{
return policy != nil && policy.AllowPreemptOther
}
+// IsAppCreationForced returns true if the application creation is triggered
by the shim
+// reporting an existing allocation. In this case, it needs to be accepted
regardless
+// of whether it maps to a valid queue.
+func IsAppCreationForced(tags map[string]string) bool {
+ tagVal := ""
+ for key, val := range tags {
+ if strings.EqualFold(key, interfaceCommon.AppTagCreateForce) {
+ tagVal = val
+ break
+ }
+ }
+ result, err := strconv.ParseBool(tagVal)
+ if err != nil {
+ return false
+ }
+ return result
+}
+
+// IsRecoveryQueue returns true if the given queue represents the recovery
queue
+func IsRecoveryQueue(queueName string) bool {
+ return strings.EqualFold(queueName, RecoveryQueueFull)
+}
+
// ZeroTimeInUnixNano return the unix nano or nil if the time is zero.
func ZeroTimeInUnixNano(t time.Time) *int64 {
if t.IsZero() {
diff --git a/pkg/common/utils_test.go b/pkg/common/utils_test.go
index 1aa86206..d97fe637 100644
--- a/pkg/common/utils_test.go
+++ b/pkg/common/utils_test.go
@@ -161,6 +161,18 @@ func TestAllowPreemptOther(t *testing.T) {
assert.Check(t,
!IsAllowPreemptOther(&si.PreemptionPolicy{AllowPreemptOther: false}), "Preempt
other should not be allowed if policy does not allow")
}
+func TestIsAppCreationForced(t *testing.T) {
+ assert.Check(t, !IsAppCreationForced(nil), "nil tags should not result
in forced app creation")
+ tags := make(map[string]string)
+ assert.Check(t, !IsAppCreationForced(tags), "empty tags should not
result in forced app creation")
+ tags[common.AppTagCreateForce] = "false"
+ assert.Check(t, !IsAppCreationForced(tags), "false creation tag should
not result in forced app creation")
+ tags[common.AppTagCreateForce] = "invalid"
+ assert.Check(t, !IsAppCreationForced(tags), "invalid creation tag
should not result in forced app creation")
+ tags[common.AppTagCreateForce] = "true"
+ assert.Check(t, IsAppCreationForced(tags), "creation tag should result
in forced app creation")
+}
+
func TestConvertSITimeoutWithAdjustment(t *testing.T) {
created := time.Now().Unix() - 600
defaultTimeout := 15 * time.Minute
diff --git a/pkg/scheduler/context.go b/pkg/scheduler/context.go
index c944d886..95e0fb21 100644
--- a/pkg/scheduler/context.go
+++ b/pkg/scheduler/context.go
@@ -504,7 +504,7 @@ func (cc *ClusterContext)
handleRMUpdateApplicationEvent(event *rmevent.RMUpdate
}
// convert and resolve the user: cache can be set per partition
// need to do this before we create the application
- ugi, err := partition.convertUGI(app.Ugi)
+ ugi, err := partition.convertUGI(app.Ugi,
common.IsAppCreationForced(app.Tags))
if err != nil {
rejectedApps = append(rejectedApps,
&si.RejectedApplication{
ApplicationID: app.ApplicationID,
diff --git a/pkg/scheduler/objects/application.go
b/pkg/scheduler/objects/application.go
index 3f58c1a7..c4fe10b1 100644
--- a/pkg/scheduler/objects/application.go
+++ b/pkg/scheduler/objects/application.go
@@ -70,20 +70,20 @@ type StateLogEntry struct {
}
type Application struct {
- ApplicationID string
- Partition string
- SubmissionTime time.Time
+ ApplicationID string // application ID
+ Partition string // partition Name
+ SubmissionTime time.Time // time application was submitted
+ tags map[string]string // application tags used in scheduling
- // Private fields need protection
+ // Private mutable fields need protection
queuePath string
queue *Queue // queue the application is
running in
pending *resources.Resource // pending resources from
asks for the app
reservations map[string]*reservation // a map of reservations
requests map[string]*AllocationAsk // a map of asks
- sortedRequests sortedRequests
- user security.UserGroup // owner of the application
- tags map[string]string // application tags used in
scheduling
- allocatedResource *resources.Resource // total allocated resources
+ sortedRequests sortedRequests // list of requests
pre-sorted
+ user security.UserGroup // owner of the application
+ allocatedResource *resources.Resource // total allocated resources
usedResource *resources.UsedResource // keep track of resource usage of
the application
@@ -1877,9 +1877,6 @@ func (sa *Application) GetUser() security.UserGroup {
// Get a tag from the application
// Note: tags are not case sensitive
func (sa *Application) GetTag(tag string) string {
- sa.RLock()
- defer sa.RUnlock()
-
tagVal := ""
for key, val := range sa.tags {
if strings.EqualFold(key, tag) {
@@ -1890,6 +1887,10 @@ func (sa *Application) GetTag(tag string) string {
return tagVal
}
+func (sa *Application) IsCreateForced() bool {
+ return common.IsAppCreationForced(sa.tags)
+}
+
func (sa *Application) SetTerminatedCallback(callback func(appID string)) {
sa.Lock()
defer sa.Unlock()
diff --git a/pkg/scheduler/objects/application_test.go
b/pkg/scheduler/objects/application_test.go
index ef515ec7..cda3de42 100644
--- a/pkg/scheduler/objects/application_test.go
+++ b/pkg/scheduler/objects/application_test.go
@@ -1195,6 +1195,23 @@ func TestGetTag(t *testing.T) {
assert.Equal(t, tag, "test value", "expected tag value")
}
+func TestIsCreateForced(t *testing.T) {
+ app := newApplicationWithTags(appID1, "default", "root.a", nil)
+ assert.Check(t, !app.IsCreateForced(), "found forced app but tags nil")
+ tags := make(map[string]string)
+ app = newApplicationWithTags(appID1, "default", "root.a", tags)
+ assert.Check(t, !app.IsCreateForced(), "found forced app but tags
empty")
+ tags[siCommon.AppTagCreateForce] = "false"
+ app = newApplicationWithTags(appID1, "default", "root.a", tags)
+ assert.Check(t, !app.IsCreateForced(), "found forced app but forced tag
was false")
+ tags[siCommon.AppTagCreateForce] = "unknown"
+ app = newApplicationWithTags(appID1, "default", "root.a", tags)
+ assert.Check(t, !app.IsCreateForced(), "found forced app but forced tag
was invalid")
+ tags[siCommon.AppTagCreateForce] = "true"
+ app = newApplicationWithTags(appID1, "default", "root.a", tags)
+ assert.Check(t, app.IsCreateForced(), "found unforced app but forced
tag was set")
+}
+
func TestOnStatusChangeCalled(t *testing.T) {
app, testHandler := newApplicationWithHandler(appID1, "default",
"root.a")
assert.Equal(t, New.String(), app.CurrentState(), "new app not in New
state")
diff --git a/pkg/scheduler/objects/queue.go b/pkg/scheduler/objects/queue.go
index 902cfe20..511488f2 100644
--- a/pkg/scheduler/objects/queue.go
+++ b/pkg/scheduler/objects/queue.go
@@ -20,6 +20,7 @@ package objects
import (
"context"
+ "errors"
"fmt"
"strconv"
"strings"
@@ -29,6 +30,7 @@ import (
"github.com/looplab/fsm"
"go.uber.org/zap"
+ "github.com/apache/yunikorn-core/pkg/common"
"github.com/apache/yunikorn-core/pkg/common/configs"
"github.com/apache/yunikorn-core/pkg/common/resources"
"github.com/apache/yunikorn-core/pkg/common/security"
@@ -39,7 +41,7 @@ import (
"github.com/apache/yunikorn-core/pkg/scheduler/policies"
"github.com/apache/yunikorn-core/pkg/scheduler/ugm"
"github.com/apache/yunikorn-core/pkg/webservice/dao"
- "github.com/apache/yunikorn-scheduler-interface/lib/go/common"
+ siCommon "github.com/apache/yunikorn-scheduler-interface/lib/go/common"
)
var (
@@ -146,6 +148,25 @@ func NewConfiguredQueue(conf configs.QueueConfig, parent
*Queue) (*Queue, error)
return sq, nil
}
+// NewRecoveryQueue creates a recovery queue if it does not exist. The
recovery queue
+// is a dynamic queue, but has an invalid name so that it cannot be directly
referenced.
+func NewRecoveryQueue(parent *Queue) (*Queue, error) {
+ if parent == nil {
+ return nil, errors.New("recovery queue cannot be created with
nil parent")
+ }
+ if parent.GetQueuePath() != configs.RootQueue {
+ return nil, fmt.Errorf("recovery queue cannot be created with
non-root parent: %s", parent.GetQueuePath())
+ }
+ queue, err := newDynamicQueueInternal(common.RecoveryQueue, true,
parent)
+ if err == nil {
+ queue.Lock()
+ defer queue.Unlock()
+ queue.submitACL = security.ACL{}
+ queue.sortType = policies.FifoSortPolicy
+ }
+ return queue, err
+}
+
// NewDynamicQueue creates a new queue to be added to the system based on the
placement rules
// A dynamically added queue can never be the root queue so parent must be set
// lock free as it cannot be referenced yet
@@ -158,6 +179,10 @@ func NewDynamicQueue(name string, leaf bool, parent
*Queue) (*Queue, error) {
if !configs.QueueNameRegExp.MatchString(name) {
return nil, fmt.Errorf("invalid queue name '%s', a name must
only have alphanumeric characters, - or _, and be no longer than 64
characters", name)
}
+ return newDynamicQueueInternal(name, leaf, parent)
+}
+
+func newDynamicQueueInternal(name string, leaf bool, parent *Queue) (*Queue,
error) {
sq := newBlankQueue()
sq.Name = strings.ToLower(name)
sq.QueuePath = parent.QueuePath + configs.DOT + sq.Name
@@ -385,12 +410,15 @@ func (sq *Queue) setTemplate(conf configs.ChildTemplate)
error {
func (sq *Queue) UpdateQueueProperties() {
sq.Lock()
defer sq.Unlock()
-
+ if common.IsRecoveryQueue(sq.QueuePath) {
+ // recovery queue properties should never be updated
+ sq.sortType = policies.FifoSortPolicy
+ return
+ }
if !sq.isLeaf {
// set the sorting type for parent queues
sq.sortType = policies.FairSortPolicy
}
-
// walk over all properties and process
var err error
for key, value := range sq.properties {
@@ -536,6 +564,10 @@ func (sq *Queue) GetPreemptionDelay() time.Duration {
// The check is performed recursively: i.e. access to the parent allows access
to this queue.
// This will check both submitACL and adminACL.
func (sq *Queue) CheckSubmitAccess(user security.UserGroup) bool {
+ if common.IsRecoveryQueue(sq.QueuePath) {
+ // recovery queue can never pass ACL checks
+ return false
+ }
sq.RLock()
allow := sq.submitACL.CheckAccess(user) || sq.adminACL.CheckAccess(user)
sq.RUnlock()
@@ -681,9 +713,9 @@ func (sq *Queue) AddApplication(app *Application) {
sq.queueEvents.sendNewApplicationEvent(appID)
// YUNIKORN-199: update the quota from the namespace
// get the tag with the quota
- quota := app.GetTag(common.AppTagNamespaceResourceQuota)
+ quota := app.GetTag(siCommon.AppTagNamespaceResourceQuota)
// get the tag with the guaranteed resource
- guaranteed := app.GetTag(common.AppTagNamespaceResourceGuaranteed)
+ guaranteed := app.GetTag(siCommon.AppTagNamespaceResourceGuaranteed)
if quota == "" && guaranteed == "" {
return
}
diff --git a/pkg/scheduler/objects/queue_test.go
b/pkg/scheduler/objects/queue_test.go
index dceb138a..bd492a6d 100644
--- a/pkg/scheduler/objects/queue_test.go
+++ b/pkg/scheduler/objects/queue_test.go
@@ -2302,6 +2302,40 @@ func TestNewConfiguredQueue(t *testing.T) {
assert.Assert(t, childNonLeaf.maxResource == nil)
}
+func TestNewRecoveryQueue(t *testing.T) {
+ var err error
+ if _, err = NewRecoveryQueue(nil); err == nil {
+ t.Fatalf("recovery queue creation should fail with nil parent")
+ }
+
+ parent, err := createManagedQueueWithProps(nil, "parent", true, nil,
nil)
+ assert.NilError(t, err, "failed to create queue: %v", err)
+ if _, err = NewRecoveryQueue(parent); err == nil {
+ t.Fatalf("recovery queue creation should fail with non-root
parent")
+ }
+
+ parentConfig := configs.QueueConfig{
+ Name: "root",
+ Parent: true,
+ Properties: map[string]string{configs.ApplicationSortPolicy:
"fair"},
+ ChildTemplate: configs.ChildTemplate{Properties:
map[string]string{configs.ApplicationSortPolicy: "fair"}},
+ }
+ parent, err = NewConfiguredQueue(parentConfig, nil)
+ assert.NilError(t, err, "failed to create queue: %v", err)
+ recoveryQueue, err := NewRecoveryQueue(parent)
+ assert.NilError(t, err, "failed to create recovery queue: %v", err)
+ assert.Equal(t, common.RecoveryQueueFull, recoveryQueue.GetQueuePath(),
"wrong queue name")
+ assert.Equal(t, policies.FifoSortPolicy, recoveryQueue.getSortType(),
"wrong sort type")
+}
+
+func TestNewDynamicQueueDoesNotCreateRecovery(t *testing.T) {
+ parent, err := createRootQueue(nil)
+ assert.NilError(t, err, "failed to create queue: %v", err)
+ if _, err := NewDynamicQueue(common.RecoveryQueue, true, parent); err
== nil {
+ t.Fatalf("invalid recovery queue %s was created",
common.RecoveryQueueFull)
+ }
+}
+
func TestNewDynamicQueue(t *testing.T) {
parent, err := createManagedQueueWithProps(nil, "parent", true, nil,
nil)
assert.NilError(t, err, "failed to create queue: %v", err)
diff --git a/pkg/scheduler/partition.go b/pkg/scheduler/partition.go
index 101200de..8bf2786e 100644
--- a/pkg/scheduler/partition.go
+++ b/pkg/scheduler/partition.go
@@ -157,22 +157,13 @@ func (pc *PartitionContext) updatePartitionDetails(conf
configs.PartitionConfig)
if len(conf.Queues) == 0 || conf.Queues[0].Name != configs.RootQueue {
return fmt.Errorf("partition cannot be created without root
queue")
}
-
- if pc.placementManager.IsInitialised() {
- log.Log(log.SchedPartition).Info("Updating placement manager
rules on config reload")
- err := pc.placementManager.UpdateRules(conf.PlacementRules)
- if err != nil {
- log.Log(log.SchedPartition).Info("New placement rules
not activated, config reload failed", zap.Error(err))
- return err
- }
- pc.rules = &conf.PlacementRules
- } else {
- log.Log(log.SchedPartition).Info("Creating new placement
manager on config reload")
- pc.rules = &conf.PlacementRules
- // We need to pass in the locked version of the GetQueue
function.
- // Placing an application will not have a lock on the partition
context.
- pc.placementManager = placement.NewPlacementManager(*pc.rules,
pc.GetQueue)
+ log.Log(log.SchedPartition).Info("Updating placement manager rules on
config reload")
+ err := pc.placementManager.UpdateRules(conf.PlacementRules)
+ if err != nil {
+ log.Log(log.SchedPartition).Info("New placement rules not
activated, config reload failed", zap.Error(err))
+ return err
}
+ pc.rules = &conf.PlacementRules
pc.updateNodeSortingPolicy(conf)
// start at the root: there is only one queue
queueConf := conf.Queues[0]
@@ -308,38 +299,41 @@ func (pc *PartitionContext) AddApplication(app
*objects.Application) error {
}
// Put app under the queue
- queueName := app.GetQueuePath()
pm := pc.getPlacementManager()
- if pm.IsInitialised() {
- err := pm.PlaceApplication(app)
- if err != nil {
- return fmt.Errorf("failed to place application %s: %v",
appID, err)
- }
- queueName = app.GetQueuePath()
- if queueName == "" {
- return fmt.Errorf("application rejected by placement
rules: %s", appID)
- }
+ err := pm.PlaceApplication(app)
+ if err != nil {
+ return fmt.Errorf("failed to place application %s: %v", appID,
err)
}
+ queueName := app.GetQueuePath()
+ if queueName == "" {
+ return fmt.Errorf("application rejected by placement rules:
%s", appID)
+ }
+
// lock the partition and make the last change: we need to do this
before creating the queues.
// queue cleanup might otherwise remove the queue again before we can
add the application
pc.Lock()
defer pc.Unlock()
// we have a queue name either from placement or direct, get the queue
queue := pc.getQueueInternal(queueName)
+
+ // create the queue if necessary
if queue == nil {
- // queue must exist if not using placement rules
- if !pm.IsInitialised() {
- return fmt.Errorf("application '%s' rejected, cannot
create queue '%s' without placement rules", appID, queueName)
- }
- // with placement rules the hierarchy might not exist so try
and create it
var err error
- queue, err = pc.createQueue(queueName, app.GetUser())
- if err != nil {
- return fmt.Errorf("failed to create rule based queue %s
for application %s", queueName, appID)
+ if common.IsRecoveryQueue(queueName) {
+ queue, err = pc.createRecoveryQueue()
+ if err != nil {
+ return fmt.Errorf("failed to create recovery
queue %s for application %s", common.RecoveryQueueFull, appID)
+ }
+ } else {
+ queue, err = pc.createQueue(queueName, app.GetUser())
+ if err != nil {
+ return fmt.Errorf("failed to create rule based
queue %s for application %s", queueName, appID)
+ }
}
}
- // check the queue: is a leaf queue with submit access
- if !queue.IsLeafQueue() || !queue.CheckSubmitAccess(app.GetUser()) {
+
+ // check the queue: is a leaf queue
+ if !queue.IsLeafQueue() {
return fmt.Errorf("failed to find queue %s for application %s",
queueName, appID)
}
@@ -486,6 +480,11 @@ func (pc *PartitionContext) GetPartitionQueues()
dao.PartitionQueueDAOInfo {
return PartitionQueueDAOInfo
}
+// Create the recovery queue.
+func (pc *PartitionContext) createRecoveryQueue() (*objects.Queue, error) {
+ return objects.NewRecoveryQueue(pc.root)
+}
+
// Create a queue with full hierarchy. This is called when a new queue is
created from a placement rule.
// The final leaf queue does not exist otherwise we would not get here.
// This means that at least 1 queue (a leaf queue) will be created
@@ -1186,10 +1185,10 @@ func (pc *PartitionContext) addAllocation(alloc
*objects.Allocation) error {
return nil
}
-func (pc *PartitionContext) convertUGI(ugi *si.UserGroupInformation)
(security.UserGroup, error) {
+func (pc *PartitionContext) convertUGI(ugi *si.UserGroupInformation, forced
bool) (security.UserGroup, error) {
pc.RLock()
defer pc.RUnlock()
- return pc.userGroupCache.ConvertUGI(ugi)
+ return pc.userGroupCache.ConvertUGI(ugi, forced)
}
// calculate overall nodes resource usage and returns a map as the result,
diff --git a/pkg/scheduler/partition_test.go b/pkg/scheduler/partition_test.go
index 88381404..9005929d 100644
--- a/pkg/scheduler/partition_test.go
+++ b/pkg/scheduler/partition_test.go
@@ -118,7 +118,6 @@ func TestNewPartition(t *testing.T) {
if partition.root.QueuePath != "root" {
t.Fatal("partition root queue not set as expected")
}
- assert.Assert(t, !partition.placementManager.IsInitialised(),
"partition should not have initialised placement manager")
}
func TestNewWithPlacement(t *testing.T) {
@@ -143,8 +142,7 @@ func TestNewWithPlacement(t *testing.T) {
}
partition, err := newPartitionContext(confWith, rmID, nil)
assert.NilError(t, err, "test partition create failed with error")
- assert.Assert(t, partition.placementManager.IsInitialised(), "partition
should have initialised placement manager")
- assert.Equal(t, len(*partition.rules), 1, "Placement rules not set as
expected ")
+ assert.Equal(t, len(*partition.rules), 1, "Placement rules not set as
expected")
// add a rule and check if it is updated
confWith = configs.PartitionConfig{
@@ -170,8 +168,7 @@ func TestNewWithPlacement(t *testing.T) {
}
err = partition.updatePartitionDetails(confWith)
assert.NilError(t, err, "update partition failed unexpected with error")
- assert.Assert(t, partition.placementManager.IsInitialised(), "partition
should have initialised placement manager")
- assert.Equal(t, len(*partition.rules), 2, "Placement rules not updated
as expected ")
+ assert.Equal(t, len(*partition.rules), 2, "Placement rules not updated
as expected")
// update to turn off placement manager
conf := configs.PartitionConfig{
@@ -187,14 +184,12 @@ func TestNewWithPlacement(t *testing.T) {
}
err = partition.updatePartitionDetails(conf)
assert.NilError(t, err, "update partition failed unexpected with error")
- assert.Assert(t, !partition.placementManager.IsInitialised(),
"partition should not have initialised placement manager")
- assert.Equal(t, len(*partition.rules), 0, "Placement rules not updated
as expected ")
+ assert.Equal(t, len(*partition.rules), 0, "Placement rules not updated
as expected")
// set the old config back this should turn on the placement again
err = partition.updatePartitionDetails(confWith)
assert.NilError(t, err, "update partition failed unexpected with error")
- assert.Assert(t, partition.placementManager.IsInitialised(), "partition
should have initialised placement manager")
- assert.Equal(t, len(*partition.rules), 2, "Placement rules not updated
as expected ")
+ assert.Equal(t, len(*partition.rules), 2, "Placement rules not updated
as expected")
}
func TestAddNode(t *testing.T) {
@@ -928,6 +923,115 @@ func TestAddApp(t *testing.T) {
}
}
+func TestAddAppForced(t *testing.T) {
+ partition, err := newBasePartition()
+ assert.NilError(t, err, "partition create failed")
+
+ // add a new app to an invalid queue
+ app := newApplication(appID1, "default", "root.invalid")
+ err = partition.AddApplication(app)
+ if err == nil || partition.getApplication(appID1) != nil {
+ t.Fatalf("add application to nonexistent queue should have
failed but did not")
+ }
+
+ // re-add the app, but mark it as forced. this should create the
recovery queue and assign the app to it
+ app = newApplicationTags(appID1, "default", "root.invalid",
map[string]string{siCommon.AppTagCreateForce: "true"})
+ err = partition.AddApplication(app)
+ assert.NilError(t, err, "app create failed")
+ partApp := partition.getApplication(appID1)
+ if partApp == nil {
+ t.Fatalf("app not found after adding to partition")
+ }
+ recoveryQueue := partition.GetQueue(common.RecoveryQueueFull)
+ if recoveryQueue == nil {
+ t.Fatalf("recovery queue not found")
+ }
+ assert.Equal(t, common.RecoveryQueueFull, partApp.GetQueuePath(),
"wrong queue path for app2")
+ assert.Check(t, recoveryQueue == partApp.GetQueue(), "wrong queue for
app")
+ assert.Equal(t, 1, len(recoveryQueue.GetCopyOfApps()), "wrong queue
length")
+
+ // add second forced app. this should use the existing recovery queue
rather than recreating it
+ app2 := newApplicationTags(appID2, "default", "root.invalid2",
map[string]string{siCommon.AppTagCreateForce: "true"})
+ err = partition.AddApplication(app2)
+ assert.NilError(t, err, "app2 create failed")
+ partApp2 := partition.getApplication(appID2)
+ if partApp2 == nil {
+ t.Fatalf("app2 not found after adding to partition")
+ }
+ assert.Equal(t, common.RecoveryQueueFull, partApp2.GetQueuePath(),
"wrong queue path for app2")
+ assert.Check(t, recoveryQueue == partApp2.GetQueue(), "wrong queue for
app2")
+ assert.Equal(t, 2, len(recoveryQueue.GetCopyOfApps()), "wrong queue
length")
+
+ // add third app (not forced), but referencing the recovery queue. this
should fail.
+ app3 := newApplication(appID3, "default", common.RecoveryQueueFull)
+ err = partition.AddApplication(app3)
+ if err == nil || partition.getApplication(appID3) != nil {
+ t.Fatalf("add app3 to recovery queue should have failed but did
not")
+ }
+
+ // re-add third app, but forced. This should succeed.
+ app3 = newApplicationTags(appID3, "default", common.RecoveryQueueFull,
map[string]string{siCommon.AppTagCreateForce: "true"})
+ err = partition.AddApplication(app3)
+ assert.NilError(t, err, "app3 create failed")
+ partApp3 := partition.getApplication(appID3)
+ if partApp3 == nil {
+ t.Fatalf("app3 not found after adding to partition")
+ }
+ assert.Equal(t, common.RecoveryQueueFull, partApp3.GetQueuePath(),
"wrong queue path for app3")
+ assert.Check(t, recoveryQueue == partApp3.GetQueue(), "wrong queue for
app3")
+ assert.Equal(t, 3, len(recoveryQueue.GetCopyOfApps()), "wrong queue
length")
+}
+
+func TestAddAppForcedWithPlacement(t *testing.T) {
+ confWith := configs.PartitionConfig{
+ Name: "test",
+ Queues: []configs.QueueConfig{
+ {
+ Name: "root",
+ Parent: true,
+ SubmitACL: "*",
+ Queues: nil,
+ },
+ },
+ PlacementRules: []configs.PlacementRule{
+ {
+ Name: "tag",
+ Value: "queue",
+ Create: true,
+ },
+ },
+ Limits: nil,
+ NodeSortPolicy: configs.NodeSortingPolicy{},
+ }
+ partition, err := newPartitionContext(confWith, rmID, nil)
+ assert.NilError(t, err, "test partition create failed with error")
+
+ // add a new app using tag rule
+ app := newApplicationTags(appID1, "default", "",
map[string]string{"queue": "root.test"})
+ err = partition.AddApplication(app)
+ assert.NilError(t, err, "failed to add app to tagged queue")
+ assert.Equal(t, "root.test", app.GetQueuePath(), "app assigned to wrong
queue")
+
+ // add a second app without a tag rule
+ app2 := newApplicationTags(appID2, "default", "root.untagged",
map[string]string{})
+ err = partition.AddApplication(app2)
+ if err == nil || partition.getApplication(appID2) != nil {
+ t.Fatalf("add app2 to fixed queue should have failed but did
not")
+ }
+
+ // attempt to add the app again, but with forced addition
+ app2 = newApplicationTags(appID2, "default", "root.untagged",
map[string]string{siCommon.AppTagCreateForce: "true"})
+ err = partition.AddApplication(app2)
+ assert.NilError(t, err, "failed to add app2 to tagged queue")
+ assert.Equal(t, common.RecoveryQueueFull, app2.GetQueuePath(), "app2
assigned to wrong queue")
+
+ // add a third app, but with the recovery queue tagged
+ app3 := newApplicationTags(appID3, "default", common.RecoveryQueueFull,
map[string]string{siCommon.AppTagCreateForce: "true"})
+ err = partition.AddApplication(app3)
+ assert.NilError(t, err, "failed to add app3 to tagged queue")
+ assert.Equal(t, common.RecoveryQueueFull, app3.GetQueuePath(), "app2
assigned to wrong queue")
+}
+
func TestAddAppTaskGroup(t *testing.T) {
partition, err := newBasePartition()
assert.NilError(t, err, "partition create failed")
diff --git a/pkg/scheduler/placement/fixed_rule.go
b/pkg/scheduler/placement/fixed_rule.go
index 8dce0bba..0dce3847 100644
--- a/pkg/scheduler/placement/fixed_rule.go
+++ b/pkg/scheduler/placement/fixed_rule.go
@@ -63,30 +63,31 @@ func (fr *fixedRule) initialise(conf configs.PlacementRule)
error {
return err
}
-func (fr *fixedRule) placeApplication(app *objects.Application, queueFn
func(string) *objects.Queue) (string, error) {
+func (fr *fixedRule) placeApplication(app *objects.Application, queueFn
func(string) *objects.Queue) (string, bool, error) {
// before anything run the filter
if !fr.filter.allowUser(app.GetUser()) {
log.Log(log.Config).Debug("Fixed rule filtered",
zap.String("application", app.ApplicationID),
zap.Any("user", app.GetUser()),
zap.String("queueName", fr.queue))
- return "", nil
+ return "", true, nil
}
var parentName string
+ var aclCheck = true
var err error
queueName := fr.queue
// if the fixed queue is already fully qualified skip the parent check
if !fr.qualified {
// run the parent rule if set
if fr.parent != nil {
- parentName, err = fr.parent.placeApplication(app,
queueFn)
+ parentName, aclCheck, err =
fr.parent.placeApplication(app, queueFn)
// failed parent rule, fail this rule
if err != nil {
- return "", err
+ return "", aclCheck, err
}
// rule did not return a parent: this could be filter
or create flag related
if parentName == "" {
- return "", nil
+ return "", aclCheck, nil
}
// check if this is a parent queue and qualify it
if !strings.HasPrefix(parentName,
configs.RootQueue+configs.DOT) {
@@ -95,7 +96,7 @@ func (fr *fixedRule) placeApplication(app
*objects.Application, queueFn func(str
// if the parent queue exists it cannot be a leaf
parentQueue := queueFn(parentName)
if parentQueue != nil && parentQueue.IsLeafQueue() {
- return "", fmt.Errorf("parent rule returned a
leaf queue: %s", parentName)
+ return "", aclCheck, fmt.Errorf("parent rule
returned a leaf queue: %s", parentName)
}
}
// the parent is set from the rule otherwise set it to the root
@@ -112,10 +113,10 @@ func (fr *fixedRule) placeApplication(app
*objects.Application, queueFn func(str
queue := queueFn(queueName)
// if we cannot create the queue must exist
if !fr.create && queue == nil {
- return "", nil
+ return "", aclCheck, nil
}
log.Log(log.Config).Info("Fixed rule application placed",
zap.String("application", app.ApplicationID),
zap.String("queue", queueName))
- return queueName, nil
+ return queueName, aclCheck, nil
}
diff --git a/pkg/scheduler/placement/fixed_rule_test.go
b/pkg/scheduler/placement/fixed_rule_test.go
index 5067f93e..c3e4e951 100644
--- a/pkg/scheduler/placement/fixed_rule_test.go
+++ b/pkg/scheduler/placement/fixed_rule_test.go
@@ -90,10 +90,12 @@ partitions:
t.Errorf("fixed rule create failed with queue name, err %v",
err)
}
var queue string
- queue, err = fr.placeApplication(app, queueFunc)
+ var aclCheck bool
+ queue, aclCheck, err = fr.placeApplication(app, queueFunc)
if queue != "root.testqueue" || err != nil {
t.Errorf("fixed rule failed to place queue in correct queue
'%s', err %v", queue, err)
}
+ assert.Check(t, aclCheck, "acls should be checked")
// fixed queue that exists directly in hierarchy
conf = configs.PlacementRule{
@@ -104,10 +106,11 @@ partitions:
if err != nil || fr == nil {
t.Errorf("fixed rule create failed with queue name, err %v",
err)
}
- queue, err = fr.placeApplication(app, queueFunc)
+ queue, aclCheck, err = fr.placeApplication(app, queueFunc)
if queue != "root.testparent.testchild" || err != nil {
t.Errorf("fixed rule failed to place queue in correct queue
'%s', err %v", queue, err)
}
+ assert.Check(t, aclCheck, "acls should be checked")
// fixed queue that does not exists
conf = configs.PlacementRule{
@@ -119,10 +122,11 @@ partitions:
if err != nil || fr == nil {
t.Errorf("fixed rule create failed with queue name, err %v",
err)
}
- queue, err = fr.placeApplication(app, queueFunc)
+ queue, aclCheck, err = fr.placeApplication(app, queueFunc)
if queue != "root.newqueue" || err != nil {
t.Errorf("fixed rule failed to place queue in to be created
queue '%s', err %v", queue, err)
}
+ assert.Check(t, aclCheck, "acls should be checked")
// trying to place in a parent queue should not fail: failure happens
on create in this case
conf = configs.PlacementRule{
@@ -133,10 +137,11 @@ partitions:
if err != nil || fr == nil {
t.Errorf("fixed rule create failed with queue name, err %v",
err)
}
- queue, err = fr.placeApplication(app, queueFunc)
+ queue, aclCheck, err = fr.placeApplication(app, queueFunc)
if queue != "root.testparent" || err != nil {
t.Errorf("fixed rule did fail with parent queue '%s', error
%v", queue, err)
}
+ assert.Check(t, aclCheck, "acls should be checked")
// trying to place in a child using a parent
conf = configs.PlacementRule{
@@ -151,10 +156,11 @@ partitions:
if err != nil || fr == nil {
t.Errorf("fixed rule create failed with queue name, err %v",
err)
}
- queue, err = fr.placeApplication(app, queueFunc)
+ queue, aclCheck, err = fr.placeApplication(app, queueFunc)
if queue != "root.testparent.testchild" || err != nil {
t.Errorf("fixed rule with parent queue should not have failed
'%s', error %v", queue, err)
}
+ assert.Check(t, aclCheck, "acls should be checked")
}
func TestFixedRuleParent(t *testing.T) {
@@ -184,10 +190,12 @@ func TestFixedRuleParent(t *testing.T) {
t.Errorf("fixed rule create failed with queue name, err %v",
err)
}
var queue string
- queue, err = fr.placeApplication(app, queueFunc)
+ var aclCheck bool
+ queue, aclCheck, err = fr.placeApplication(app, queueFunc)
if queue != "" || err != nil {
t.Errorf("fixed rule with create false for child should have
failed and gave '%s', error %v", queue, err)
}
+ assert.Check(t, aclCheck, "acls should be checked")
// trying to place in a child using a non creatable parent
conf = configs.PlacementRule{
@@ -204,10 +212,11 @@ func TestFixedRuleParent(t *testing.T) {
if err != nil || fr == nil {
t.Errorf("fixed rule create failed with queue name, err %v",
err)
}
- queue, err = fr.placeApplication(app, queueFunc)
+ queue, aclCheck, err = fr.placeApplication(app, queueFunc)
if queue != "" || err != nil {
t.Errorf("fixed rule with non existing parent queue should have
failed '%s', error %v", queue, err)
}
+ assert.Check(t, aclCheck, "acls should be checked")
// trying to place in a child using a creatable parent
conf = configs.PlacementRule{
@@ -224,10 +233,11 @@ func TestFixedRuleParent(t *testing.T) {
if err != nil || fr == nil {
t.Errorf("fixed rule create failed with queue name, err %v",
err)
}
- queue, err = fr.placeApplication(app, queueFunc)
+ queue, aclCheck, err = fr.placeApplication(app, queueFunc)
if queue != nameParentChild || err != nil {
t.Errorf("fixed rule with non existing parent queue should
created '%s', error %v", queue, err)
}
+ assert.Check(t, aclCheck, "acls should be checked")
// trying to place in a child using a parent which is defined as a leaf
conf = configs.PlacementRule{
@@ -243,8 +253,9 @@ func TestFixedRuleParent(t *testing.T) {
if err != nil || fr == nil {
t.Errorf("fixed rule create failed with queue name, err %v",
err)
}
- queue, err = fr.placeApplication(app, queueFunc)
+ queue, aclCheck, err = fr.placeApplication(app, queueFunc)
if queue != "" || err == nil {
t.Errorf("fixed rule with parent declared as leaf should have
failed '%s', error %v", queue, err)
}
+ assert.Check(t, aclCheck, "acls should be checked")
}
diff --git a/pkg/scheduler/placement/placement.go
b/pkg/scheduler/placement/placement.go
index c4b6ae8d..b74a4865 100644
--- a/pkg/scheduler/placement/placement.go
+++ b/pkg/scheduler/placement/placement.go
@@ -28,29 +28,22 @@ import (
"github.com/apache/yunikorn-core/pkg/common/configs"
"github.com/apache/yunikorn-core/pkg/log"
"github.com/apache/yunikorn-core/pkg/scheduler/objects"
+ "github.com/apache/yunikorn-core/pkg/scheduler/placement/types"
)
type AppPlacementManager struct {
- name string
- rules []rule
- initialised bool
- queueFn func(string) *objects.Queue
+ rules []rule
+ queueFn func(string) *objects.Queue
sync.RWMutex
}
func NewPlacementManager(rules []configs.PlacementRule, queueFunc func(string)
*objects.Queue) *AppPlacementManager {
- m := &AppPlacementManager{}
- if queueFunc == nil {
- log.Log(log.Config).Info("Placement manager created without
queue function: not active")
- return m
+ m := &AppPlacementManager{
+ queueFn: queueFunc,
}
- m.queueFn = queueFunc
- if len(rules) > 0 {
- if err := m.initialise(rules); err != nil {
- log.Log(log.Config).Info("Placement manager created
without rules: not active",
- zap.Error(err))
- }
+ if err := m.initialise(rules); err != nil {
+ log.Log(log.Config).Error("Placement manager created without
rules: not active", zap.Error(err))
}
return m
}
@@ -58,32 +51,14 @@ func NewPlacementManager(rules []configs.PlacementRule,
queueFunc func(string) *
// Update the rules for an active placement manager
// Note that this will only be called when the manager is created earlier and
the config is updated.
func (m *AppPlacementManager) UpdateRules(rules []configs.PlacementRule) error
{
- if len(rules) > 0 {
- log.Log(log.Config).Info("Building new rule list for placement
manager")
- if err := m.initialise(rules); err != nil {
- log.Log(log.Config).Info("Placement manager rules not
reloaded",
- zap.Error(err))
- return err
- }
- }
- // if there are no rules in the config we should turn off the placement
manager
- if len(rules) == 0 && m.initialised {
- m.Lock()
- defer m.Unlock()
- log.Log(log.Config).Info("Placement manager rules removed on
config reload")
- m.initialised = false
- m.rules = make([]rule, 0)
+ log.Log(log.Config).Info("Building new rule list for placement manager")
+ if err := m.initialise(rules); err != nil {
+ log.Log(log.Config).Info("Placement manager rules not
reloaded", zap.Error(err))
+ return err
}
return nil
}
-// Return the state of the placement manager
-func (m *AppPlacementManager) IsInitialised() bool {
- m.RLock()
- defer m.RUnlock()
- return m.initialised
-}
-
// Initialise the rules from a parsed config.
func (m *AppPlacementManager) initialise(rules []configs.PlacementRule) error {
log.Log(log.Config).Info("Building new rule list for placement manager")
@@ -94,14 +69,10 @@ func (m *AppPlacementManager) initialise(rules
[]configs.PlacementRule) error {
}
m.Lock()
defer m.Unlock()
- if m.queueFn == nil {
- return fmt.Errorf("placement manager queue function nil")
- }
log.Log(log.Config).Info("Activated rule set in placement manager")
m.rules = tempRules
// all done manager is initialised
- m.initialised = true
for rule := range m.rules {
log.Log(log.Config).Debug("rule set",
zap.Int("ruleNumber", rule),
@@ -114,9 +85,13 @@ func (m *AppPlacementManager) initialise(rules
[]configs.PlacementRule) error {
// If the rule set is correct and can be used the new set is returned.
// If any error is encountered a nil array is returned and the error set
func (m *AppPlacementManager) buildRules(rules []configs.PlacementRule)
([]rule, error) {
- // catch an empty list
+ // empty list should result in a single "provided" rule
if len(rules) == 0 {
- return nil, fmt.Errorf("placement manager rule list request is
empty")
+ log.Log(log.Config).Info("Placement manager configured without
rules: using implicit provided rule")
+ rules = []configs.PlacementRule{{
+ Name: types.Provided,
+ Create: false,
+ }}
}
// build temp list from new config
var newRules []rule
@@ -127,23 +102,24 @@ func (m *AppPlacementManager) buildRules(rules
[]configs.PlacementRule) ([]rule,
}
newRules = append(newRules, buildRule)
}
+ // ensure the recovery rule is always present
+ newRules = append(newRules, &recoveryRule{})
+
return newRules, nil
}
func (m *AppPlacementManager) PlaceApplication(app *objects.Application) error
{
- // Placement manager not initialised cannot place application, just
return
m.RLock()
defer m.RUnlock()
- if !m.initialised {
- return nil
- }
+
var queueName string
+ var aclCheck bool
var err error
for _, checkRule := range m.rules {
log.Log(log.Config).Debug("Executing rule for placing
application",
zap.String("ruleName", checkRule.getName()),
zap.String("application", app.ApplicationID))
- queueName, err = checkRule.placeApplication(app, m.queueFn)
+ queueName, aclCheck, err = checkRule.placeApplication(app,
m.queueFn)
if err != nil {
log.Log(log.Config).Error("rule execution failed",
zap.String("ruleName", checkRule.getName()),
@@ -164,7 +140,7 @@ func (m *AppPlacementManager) PlaceApplication(app
*objects.Application) error {
queue = m.queueFn(current)
}
// Check if the user is allowed to submit to
this queueName, if not next rule
- if !queue.CheckSubmitAccess(app.GetUser()) {
+ if aclCheck &&
!queue.CheckSubmitAccess(app.GetUser()) {
log.Log(log.Config).Debug("Submit
access denied on queue",
zap.String("queueName",
queue.GetQueuePath()),
zap.String("ruleName",
checkRule.getName()),
@@ -185,7 +161,7 @@ func (m *AppPlacementManager) PlaceApplication(app
*objects.Application) error {
continue
}
// Check if the user is allowed to submit to
this queueName, if not next rule
- if !queue.CheckSubmitAccess(app.GetUser()) {
+ if aclCheck &&
!queue.CheckSubmitAccess(app.GetUser()) {
log.Log(log.Config).Debug("Submit
access denied on queue",
zap.String("queueName",
queueName),
zap.String("ruleName",
checkRule.getName()),
diff --git a/pkg/scheduler/placement/placement_test.go
b/pkg/scheduler/placement/placement_test.go
index 9eec2c17..947fb94f 100644
--- a/pkg/scheduler/placement/placement_test.go
+++ b/pkg/scheduler/placement/placement_test.go
@@ -25,57 +25,34 @@ import (
"github.com/apache/yunikorn-core/pkg/common/configs"
"github.com/apache/yunikorn-core/pkg/common/security"
+ "github.com/apache/yunikorn-core/pkg/scheduler/placement/types"
)
// basic test to check if no rules leave the manager unusable
func TestManagerNew(t *testing.T) {
// basic info without rules, manager should not init
- man := NewPlacementManager(nil, nil)
- if man.initialised {
- t.Error("Placement manager marked initialised without rules")
- }
- if man.IsInitialised() {
- t.Error("Placement manager marked initialised without rules")
- }
- if len(man.rules) != 0 {
- t.Error("Placement manager marked initialised without rules")
- }
-}
-
-func TestManagerNoFunc(t *testing.T) {
- // basic info without rules, manager should not init
- rules := []configs.PlacementRule{
- {Name: "test"},
- }
- man := NewPlacementManager(rules, nil)
- if man.initialised {
- t.Error("Placement manager marked initialised without queue
func")
- }
- if man.initialise(rules) == nil {
- t.Error("Placement manager should not initialise with nil
function")
- }
- if man.UpdateRules(rules) == nil {
- t.Error("Placement manager should not update with nil function")
- }
+ man := NewPlacementManager(nil, queueFunc)
+ assert.Equal(t, 2, len(man.rules), "wrong rule count for empty
placement manager")
+ assert.Equal(t, types.Provided, man.rules[0].getName(), "wrong name for
implicit provided rule")
+ assert.Equal(t, types.Recovery, man.rules[1].getName(), "wrong name for
implicit recovery rule")
}
func TestManagerInit(t *testing.T) {
- // basic info without rules, manager should not init no error
+ // basic info without rules, manager should implicitly init
man := NewPlacementManager(nil, queueFunc)
- if man.initialised {
- t.Error("Placement manager marked initialised without rules")
- }
- // try to init with empty list must error
+ assert.Equal(t, 2, len(man.rules), "wrong rule count for nil placement
manager")
+
+ // try to init with empty list should do the same
var rules []configs.PlacementRule
err := man.initialise(rules)
- if err == nil || man.initialised {
- t.Error("initialise without rules should have failed")
- }
+ assert.NilError(t, err, "Failed to initialize empty placement rules")
+ assert.Equal(t, 2, len(man.rules), "wrong rule count for empty
placement manager")
+
rules = []configs.PlacementRule{
{Name: "unknown"},
}
err = man.initialise(rules)
- if err == nil || man.initialised {
+ if err == nil {
t.Error("initialise with 'unknown' rule list should have
failed")
}
@@ -84,20 +61,18 @@ func TestManagerInit(t *testing.T) {
{Name: "test"},
}
err = man.initialise(rules)
- if err != nil || !man.initialised {
- t.Errorf("failed to init existing manager, init state: %t,
error: %v", man.initialised, err)
- }
- // update the manager: remove rules init state is reverted
+ assert.NilError(t, err, "failed to init existing manager")
+
+ // update the manager: remove rules implicit state is reverted
rules = []configs.PlacementRule{}
err = man.initialise(rules)
- if err == nil || !man.initialised {
- t.Errorf("init should have failed with empty list, init state:
%t, error: %v", man.initialised, err)
- }
+ assert.NilError(t, err, "Failed to re-initialize empty placement rules")
+ assert.Equal(t, 2, len(man.rules), "wrong rule count for newly empty
placement manager")
+
// check if we handle a nil list
err = man.initialise(nil)
- if err == nil || !man.initialised {
- t.Errorf("init should have failed with nil list, init state:
%t, error: %v", man.initialised, err)
- }
+ assert.NilError(t, err, "Failed to re-initialize nil placement rules")
+ assert.Equal(t, 2, len(man.rules), "wrong rule count for nil placement
manager")
}
func TestManagerUpdate(t *testing.T) {
@@ -108,20 +83,18 @@ func TestManagerUpdate(t *testing.T) {
{Name: "test"},
}
err := man.UpdateRules(rules)
- if err != nil || !man.initialised {
- t.Errorf("failed to update existing manager, init state: %t,
error: %v", man.initialised, err)
- }
+ assert.NilError(t, err, "failed to update existing manager")
+
// update the manager: remove rules init state is reverted
rules = []configs.PlacementRule{}
err = man.UpdateRules(rules)
- if err != nil || man.initialised {
- t.Errorf("failed to update existing manager, init state: %t,
error: %v", man.initialised, err)
- }
+ assert.NilError(t, err, "Failed to re-initialize empty placement rules")
+ assert.Equal(t, 2, len(man.rules), "wrong rule count for newly empty
placement manager")
+
// check if we handle a nil list
err = man.UpdateRules(nil)
- if err != nil || man.initialised {
- t.Errorf("failed to update existing manager with nil list, init
state: %t, error: %v", man.initialised, err)
- }
+ assert.NilError(t, err, "Failed to re-initialize nil placement rules")
+ assert.Equal(t, 2, len(man.rules), "wrong rule count for nil placement
manager")
}
func TestManagerBuildRule(t *testing.T) {
@@ -134,8 +107,8 @@ func TestManagerBuildRule(t *testing.T) {
if err != nil {
t.Errorf("test rule build should not have failed, err: %v", err)
}
- if len(ruleObjs) != 1 {
- t.Errorf("test rule build should have created 1 rule found:
%d", len(ruleObjs))
+ if len(ruleObjs) != 2 {
+ t.Errorf("test rule build should have created 2 rules found:
%d", len(ruleObjs))
}
// rule with a parent rule should only be 1 rule in the list
@@ -147,8 +120,8 @@ func TestManagerBuildRule(t *testing.T) {
},
}
ruleObjs, err = man.buildRules(rules)
- if err != nil || len(ruleObjs) != 1 {
- t.Errorf("test rule build should not have failed and created 1
top level rule, err: %v, rules: %v", err, ruleObjs)
+ if err != nil || len(ruleObjs) != 2 {
+ t.Errorf("test rule build should not have failed and created 2
top level rule, err: %v, rules: %v", err, ruleObjs)
} else {
parent := ruleObjs[0].getParent()
if parent == nil || parent.getName() != "test" {
@@ -162,9 +135,9 @@ func TestManagerBuildRule(t *testing.T) {
{Name: "test"},
}
ruleObjs, err = man.buildRules(rules)
- if err != nil || len(ruleObjs) != 2 {
- t.Errorf("rule build should not have failed and created 2 rule,
err: %v, rules: %v", err, ruleObjs)
- } else if ruleObjs[0].getName() != "user" || ruleObjs[1].getName() !=
"test" {
+ if err != nil || len(ruleObjs) != 3 {
+ t.Errorf("rule build should not have failed and created 3
rules, err: %v, rules: %v", err, ruleObjs)
+ } else if ruleObjs[0].getName() != "user" || ruleObjs[1].getName() !=
"test" || ruleObjs[2].getName() != "recovery" {
t.Errorf("rule build order is not preserved: %v", ruleObjs)
}
}
@@ -207,9 +180,7 @@ partitions:
Create: true},
}
err = man.UpdateRules(rules)
- if err != nil || !man.initialised {
- t.Errorf("failed to update existing manager, init state: %t,
error: %v", man.initialised, err)
- }
+ assert.NilError(t, err, "failed to update existing manager")
tags := make(map[string]string)
user := security.UserGroup{
User: "testchild",
diff --git a/pkg/scheduler/placement/provided_rule.go
b/pkg/scheduler/placement/provided_rule.go
index 203cc28c..e7fa15f5 100644
--- a/pkg/scheduler/placement/provided_rule.go
+++ b/pkg/scheduler/placement/provided_rule.go
@@ -52,33 +52,35 @@ func (pr *providedRule) initialise(conf
configs.PlacementRule) error {
return err
}
-func (pr *providedRule) placeApplication(app *objects.Application, queueFn
func(string) *objects.Queue) (string, error) {
+func (pr *providedRule) placeApplication(app *objects.Application, queueFn
func(string) *objects.Queue) (string, bool, error) {
// since this is the provided rule we must have a queue in the info
already
queueName := app.GetQueuePath()
if queueName == "" {
- return "", nil
+ return "", true, nil
}
+
// before anything run the filter
if !pr.filter.allowUser(app.GetUser()) {
log.Log(log.Config).Debug("Provided rule filtered",
zap.String("application", app.ApplicationID),
zap.Any("user", app.GetUser()))
- return "", nil
+ return "", true, nil
}
var parentName string
+ var aclCheck = true
var err error
// if we have a fully qualified queue passed in do not run the parent
rule
if !strings.HasPrefix(queueName, configs.RootQueue+configs.DOT) {
// run the parent rule if set
if pr.parent != nil {
- parentName, err = pr.parent.placeApplication(app,
queueFn)
+ parentName, aclCheck, err =
pr.parent.placeApplication(app, queueFn)
// failed parent rule, fail this rule
if err != nil {
- return "", err
+ return "", aclCheck, err
}
// rule did not return a parent: this could be filter
or create flag related
if parentName == "" {
- return "", nil
+ return "", aclCheck, nil
}
// check if this is a parent queue and qualify it
if !strings.HasPrefix(parentName,
configs.RootQueue+configs.DOT) {
@@ -87,7 +89,7 @@ func (pr *providedRule) placeApplication(app
*objects.Application, queueFn func(
// if the parent queue exists it cannot be a leaf
parentQueue := queueFn(parentName)
if parentQueue != nil && parentQueue.IsLeafQueue() {
- return "", fmt.Errorf("parent rule returned a
leaf queue: %s", parentName)
+ return "", aclCheck, fmt.Errorf("parent rule
returned a leaf queue: %s", parentName)
}
}
// the parent is set from the rule otherwise set it to the root
@@ -104,10 +106,10 @@ func (pr *providedRule) placeApplication(app
*objects.Application, queueFn func(
queue := queueFn(queueName)
// if we cannot create the queue must exist
if !pr.create && queue == nil {
- return "", nil
+ return "", aclCheck, nil
}
log.Log(log.Config).Info("Provided rule application placed",
zap.String("application", app.ApplicationID),
zap.String("queue", queueName))
- return queueName, nil
+ return queueName, aclCheck, nil
}
diff --git a/pkg/scheduler/placement/provided_rule_test.go
b/pkg/scheduler/placement/provided_rule_test.go
index 5077c834..e400c922 100644
--- a/pkg/scheduler/placement/provided_rule_test.go
+++ b/pkg/scheduler/placement/provided_rule_test.go
@@ -57,22 +57,26 @@ partitions:
// queue that does not exists directly under the root
appInfo := newApplication("app1", "default", "unknown", user, tags,
nil, "")
var queue string
- queue, err = pr.placeApplication(appInfo, queueFunc)
+ var aclCheck bool
+ queue, aclCheck, err = pr.placeApplication(appInfo, queueFunc)
if queue != "" || err != nil {
t.Errorf("provided rule placed app in incorrect queue '%s', err
%v", queue, err)
}
+ assert.Check(t, aclCheck, "acls should be checked")
// trying to place when no queue provided in the app
appInfo = newApplication("app1", "default", "", user, tags, nil, "")
- queue, err = pr.placeApplication(appInfo, queueFunc)
+ queue, aclCheck, err = pr.placeApplication(appInfo, queueFunc)
if queue != "" || err != nil {
t.Errorf("provided rule placed app in incorrect queue '%s',
error %v", queue, err)
}
+ assert.Check(t, aclCheck, "acls should be checked")
// trying to place in a qualified queue that does not exist
appInfo = newApplication("app1", "default", "root.unknown", user, tags,
nil, "")
- queue, err = pr.placeApplication(appInfo, queueFunc)
+ queue, aclCheck, err = pr.placeApplication(appInfo, queueFunc)
if queue != "" || err != nil {
t.Errorf("provided rule placed app in incorrect queue '%s',
error %v", queue, err)
}
+ assert.Check(t, aclCheck, "acls should be checked")
// same queue now with create flag
conf = configs.PlacementRule{
Name: "provided",
@@ -82,10 +86,11 @@ partitions:
if err != nil || pr == nil {
t.Errorf("provided rule create failed, err %v", err)
}
- queue, err = pr.placeApplication(appInfo, queueFunc)
+ queue, aclCheck, err = pr.placeApplication(appInfo, queueFunc)
if queue != "root.unknown" || err != nil {
t.Errorf("provided rule placed app in incorrect queue '%s',
error %v", queue, err)
}
+ assert.Check(t, aclCheck, "acls should be checked")
conf = configs.PlacementRule{
Name: "provided",
@@ -101,18 +106,20 @@ partitions:
// unqualified queue with parent rule that exists directly in hierarchy
appInfo = newApplication("app1", "default", "testchild", user, tags,
nil, "")
- queue, err = pr.placeApplication(appInfo, queueFunc)
+ queue, aclCheck, err = pr.placeApplication(appInfo, queueFunc)
if queue != "root.testparent.testchild" || err != nil {
t.Errorf("provided rule failed to place queue in correct queue
'%s', err %v", queue, err)
}
+ assert.Check(t, aclCheck, "acls should be checked")
// qualified queue with parent rule (parent rule ignored)
appInfo = newApplication("app1", "default", "root.testparent", user,
tags, nil, "")
- queue, err = pr.placeApplication(appInfo, queueFunc)
+ queue, aclCheck, err = pr.placeApplication(appInfo, queueFunc)
if queue != "root.testparent" || err != nil {
t.Errorf("provided rule placed in to be created queue with
create false '%s', err %v", queue, err)
}
+ assert.Check(t, aclCheck, "acls should be checked")
}
func TestProvidedRuleParent(t *testing.T) {
@@ -142,10 +149,12 @@ func TestProvidedRuleParent(t *testing.T) {
appInfo := newApplication("app1", "default", "unknown", user, tags,
nil, "")
var queue string
- queue, err = pr.placeApplication(appInfo, queueFunc)
+ var aclCheck bool
+ queue, aclCheck, err = pr.placeApplication(appInfo, queueFunc)
if queue != "" || err != nil {
t.Errorf("provided rule placed app in incorrect queue '%s', err
%v", queue, err)
}
+ assert.Check(t, aclCheck, "acls should be checked")
// trying to place in a child using a non creatable parent
conf = configs.PlacementRule{
@@ -163,10 +172,11 @@ func TestProvidedRuleParent(t *testing.T) {
}
appInfo = newApplication("app1", "default", "testchild", user, tags,
nil, "")
- queue, err = pr.placeApplication(appInfo, queueFunc)
+ queue, aclCheck, err = pr.placeApplication(appInfo, queueFunc)
if queue != "" || err != nil {
t.Errorf("provided rule placed app in incorrect queue '%s', err
%v", queue, err)
}
+ assert.Check(t, aclCheck, "acls should be checked")
// trying to place in a child using a creatable parent
conf = configs.PlacementRule{
@@ -182,10 +192,11 @@ func TestProvidedRuleParent(t *testing.T) {
if err != nil || pr == nil {
t.Errorf("provided rule create failed, err %v", err)
}
- queue, err = pr.placeApplication(appInfo, queueFunc)
+ queue, aclCheck, err = pr.placeApplication(appInfo, queueFunc)
if queue != nameParentChild || err != nil {
t.Errorf("provided rule with non existing parent queue should
create '%s', error %v", queue, err)
}
+ assert.Check(t, aclCheck, "acls should be checked")
// trying to place in a child using a parent which is defined as a leaf
conf = configs.PlacementRule{
@@ -202,8 +213,9 @@ func TestProvidedRuleParent(t *testing.T) {
}
appInfo = newApplication("app1", "default", "unknown", user, tags, nil,
"")
- queue, err = pr.placeApplication(appInfo, queueFunc)
+ queue, aclCheck, err = pr.placeApplication(appInfo, queueFunc)
if queue != "" || err == nil {
t.Errorf("provided rule placed app in incorrect queue '%s', err
%v", queue, err)
}
+ assert.Check(t, aclCheck, "acls should be checked")
}
diff --git a/pkg/scheduler/placement/recovery_rule.go
b/pkg/scheduler/placement/recovery_rule.go
new file mode 100644
index 00000000..71f7dc23
--- /dev/null
+++ b/pkg/scheduler/placement/recovery_rule.go
@@ -0,0 +1,58 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package placement
+
+import (
+ "go.uber.org/zap"
+
+ "github.com/apache/yunikorn-core/pkg/common"
+ "github.com/apache/yunikorn-core/pkg/common/configs"
+ "github.com/apache/yunikorn-core/pkg/log"
+ "github.com/apache/yunikorn-core/pkg/scheduler/objects"
+ "github.com/apache/yunikorn-core/pkg/scheduler/placement/types"
+)
+
+type recoveryRule struct {
+ basicRule
+}
+
+// A rule to place an application into the recovery queue if no other rules
matched and application submission is forced.
+// This rule will be run implicitly after all other placement rules are
evaluated to ensure that an application
+// corresponding to an already-executing workload can be accepted successfully.
+func (rr *recoveryRule) getName() string {
+ return types.Recovery
+}
+
+func (rr *recoveryRule) initialise(conf configs.PlacementRule) error {
+ // no configuration needed for the recovery rule
+ return nil
+}
+
+func (rr *recoveryRule) placeApplication(app *objects.Application, _
func(string) *objects.Queue) (string, bool, error) {
+ // only forced applications should resolve to the recovery queue
+ if !app.IsCreateForced() {
+ return "", false, nil
+ }
+
+ queueName := common.RecoveryQueueFull
+ log.Log(log.Config).Info("Recovery rule application placed",
+ zap.String("application", app.ApplicationID),
+ zap.String("queue", queueName))
+ return queueName, false, nil
+}
diff --git a/pkg/scheduler/placement/recovery_rule_test.go
b/pkg/scheduler/placement/recovery_rule_test.go
new file mode 100644
index 00000000..7185396d
--- /dev/null
+++ b/pkg/scheduler/placement/recovery_rule_test.go
@@ -0,0 +1,80 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package placement
+
+import (
+ "testing"
+
+ "gotest.tools/v3/assert"
+
+ "github.com/apache/yunikorn-core/pkg/common"
+ "github.com/apache/yunikorn-core/pkg/common/configs"
+ "github.com/apache/yunikorn-core/pkg/common/security"
+ siCommon "github.com/apache/yunikorn-scheduler-interface/lib/go/common"
+)
+
+func TestRecoveryRuleInitialise(t *testing.T) {
+ conf := configs.PlacementRule{
+ Name: "recovery",
+ }
+ rr := &recoveryRule{}
+ err := rr.initialise(conf)
+ assert.NilError(t, err, "unexpected error in initialize")
+}
+
+func TestRecoveryRulePlace(t *testing.T) {
+ rr := &recoveryRule{}
+
+ // Create the structure for the test
+ data := `
+partitions:
+ - name: default
+ queues:
+ - name: testqueue
+ - name: testparent
+ queues:
+ - name: testchild
+`
+ err := initQueueStructure([]byte(data))
+ assert.NilError(t, err, "setting up the queue config failed")
+
+ // verify that non-forced app is not recovered
+ user := security.UserGroup{
+ User: "testuser",
+ Groups: []string{},
+ }
+ tags := make(map[string]string)
+ app := newApplication("app1", "default", "ignored", user, tags, nil, "")
+
+ var queue string
+ var aclCheck bool
+ queue, aclCheck, err = rr.placeApplication(app, queueFunc)
+ if queue != "" || err != nil {
+ t.Errorf("recovery rule did not bypass non-forced application,
resolved queue '%s', err %v ", queue, err)
+ }
+ assert.Check(t, !aclCheck, "acl check should not be set for recovery
rule")
+
+ tags[siCommon.AppTagCreateForce] = "true"
+ app = newApplication("app1", "default", "ignored", user, tags, nil, "")
+ queue, aclCheck, err = rr.placeApplication(app, queueFunc)
+ if queue != common.RecoveryQueueFull || err != nil {
+ t.Errorf("recovery rule did not place forced application into
recovery queue, resolved queue '%s', err %v ", queue, err)
+ }
+ assert.Check(t, !aclCheck, "acl check should not be set for recovery
rule")
+}
diff --git a/pkg/scheduler/placement/rule.go b/pkg/scheduler/placement/rule.go
index 3120d886..94a04b45 100644
--- a/pkg/scheduler/placement/rule.go
+++ b/pkg/scheduler/placement/rule.go
@@ -38,8 +38,9 @@ type rule interface {
// Execute the rule and return the queue getName the application is
placed in.
// Returns the fully qualified queue getName if the rule finds a queue
or an empty string if the rule did not match.
+ // Additionally, returns true if ACLs should be checked or false
otherwise.
// The error must only be set if there is a failure while executing the
rule not if the rule did not match.
- placeApplication(app *objects.Application, queueFn func(string)
*objects.Queue) (string, error)
+ placeApplication(app *objects.Application, queueFn func(string)
*objects.Queue) (string, bool, error)
// Return the getName of the rule which is defined in the rule.
// The basicRule provides a "unnamed rule" implementation.
diff --git a/pkg/scheduler/placement/rule_test.go
b/pkg/scheduler/placement/rule_test.go
index f4e741c8..c6d4e49f 100644
--- a/pkg/scheduler/placement/rule_test.go
+++ b/pkg/scheduler/placement/rule_test.go
@@ -61,31 +61,36 @@ func TestPlaceApp(t *testing.T) {
}
nr, err := newRule(conf)
assert.NilError(t, err, "unexpected rule initialisation error")
+ var aclCheck bool
// place application that should fail
- _, err = nr.placeApplication(nil, nil)
+ _, aclCheck, err = nr.placeApplication(nil, nil)
if err == nil {
t.Error("test rule place application did not fail as expected")
}
+ assert.Check(t, aclCheck, "acls should be checked")
var queue string
// place application that should not fail and return "test"
- queue, err = nr.placeApplication(&objects.Application{}, nil)
+ queue, aclCheck, err = nr.placeApplication(&objects.Application{}, nil)
if err != nil || queue != "test" {
t.Errorf("test rule place application did not fail, err: %v, ",
err)
}
+ assert.Check(t, aclCheck, "acls should be checked")
// place application that should not fail and return the queue in the
object
app := &objects.Application{}
app.SetQueuePath("passedin")
- queue, err = nr.placeApplication(app, nil)
+ queue, aclCheck, err = nr.placeApplication(app, nil)
if err != nil || queue != "passedin" {
t.Errorf("test rule place application did not fail, err: %v, ",
err)
}
+ assert.Check(t, aclCheck, "acls should be checked")
// place application that should not fail and return the queue in the
object
app = &objects.Application{}
app.SetQueuePath("user.name")
- queue, err = nr.placeApplication(app, nil)
+ queue, aclCheck, err = nr.placeApplication(app, nil)
if err != nil || queue != "user_dot_name" {
t.Errorf("test rule place application did not fail, err: %v, ",
err)
}
+ assert.Check(t, aclCheck, "acls should be checked")
}
func TestReplaceDot(t *testing.T) {
diff --git a/pkg/scheduler/placement/tag_rule.go
b/pkg/scheduler/placement/tag_rule.go
index 99030b75..1282a86a 100644
--- a/pkg/scheduler/placement/tag_rule.go
+++ b/pkg/scheduler/placement/tag_rule.go
@@ -57,11 +57,11 @@ func (tr *tagRule) initialise(conf configs.PlacementRule)
error {
return err
}
-func (tr *tagRule) placeApplication(app *objects.Application, queueFn
func(string) *objects.Queue) (string, error) {
+func (tr *tagRule) placeApplication(app *objects.Application, queueFn
func(string) *objects.Queue) (string, bool, error) {
// if the tag is not present we can skipp all other processing
tagVal := app.GetTag(tr.tagName)
if tagVal == "" {
- return "", nil
+ return "", true, nil
}
// before anything run the filter
if !tr.filter.allowUser(app.GetUser()) {
@@ -69,23 +69,24 @@ func (tr *tagRule) placeApplication(app
*objects.Application, queueFn func(strin
zap.String("application", app.ApplicationID),
zap.Any("user", app.GetUser()),
zap.String("tagName", tr.tagName))
- return "", nil
+ return "", true, nil
}
var parentName string
+ var aclCheck = true
var err error
queueName := tagVal
// if we have a fully qualified queue in the value do not run the
parent rule
if !strings.HasPrefix(queueName, configs.RootQueue+configs.DOT) {
// run the parent rule if set
if tr.parent != nil {
- parentName, err = tr.parent.placeApplication(app,
queueFn)
+ parentName, aclCheck, err =
tr.parent.placeApplication(app, queueFn)
// failed parent rule, fail this rule
if err != nil {
- return "", err
+ return "", aclCheck, err
}
// rule did not match: this could be filter or create
flag related
if parentName == "" {
- return "", nil
+ return "", aclCheck, nil
}
// check if this is a parent queue and qualify it
if !strings.HasPrefix(parentName,
configs.RootQueue+configs.DOT) {
@@ -94,7 +95,7 @@ func (tr *tagRule) placeApplication(app *objects.Application,
queueFn func(strin
// if the parent queue exists it cannot be a leaf
parentQueue := queueFn(parentName)
if parentQueue != nil && parentQueue.IsLeafQueue() {
- return "", fmt.Errorf("parent rule returned a
leaf queue: %s", parentName)
+ return "", aclCheck, fmt.Errorf("parent rule
returned a leaf queue: %s", parentName)
}
}
// the parent is set from the rule otherwise set it to the root
@@ -110,10 +111,10 @@ func (tr *tagRule) placeApplication(app
*objects.Application, queueFn func(strin
queue := queueFn(queueName)
// if we cannot create the queue it must exist, rule does not match
otherwise
if !tr.create && queue == nil {
- return "", nil
+ return "", aclCheck, nil
}
log.Log(log.Config).Info("Tag rule application placed",
zap.String("application", app.ApplicationID),
zap.String("queue", queueName))
- return queueName, nil
+ return queueName, aclCheck, nil
}
diff --git a/pkg/scheduler/placement/tag_rule_test.go
b/pkg/scheduler/placement/tag_rule_test.go
index 548548dd..251cf81a 100644
--- a/pkg/scheduler/placement/tag_rule_test.go
+++ b/pkg/scheduler/placement/tag_rule_test.go
@@ -23,6 +23,7 @@ import (
"gotest.tools/v3/assert"
+ "github.com/apache/yunikorn-core/pkg/common"
"github.com/apache/yunikorn-core/pkg/common/configs"
"github.com/apache/yunikorn-core/pkg/common/security"
)
@@ -89,34 +90,48 @@ partitions:
tags := make(map[string]string)
appInfo := newApplication("app1", "default", "ignored", user, tags,
nil, "")
var queue string
- queue, err = tr.placeApplication(appInfo, queueFunc)
+ var aclCheck bool
+ queue, aclCheck, err = tr.placeApplication(appInfo, queueFunc)
if queue != "" || err != nil {
t.Errorf("tag rule failed with no tag value '%s', err %v",
queue, err)
}
+ assert.Check(t, aclCheck, "acls should be checked")
// tag queue that exists directly in hierarchy
tags = map[string]string{"label1": "testqueue"}
appInfo = newApplication("app1", "default", "ignored", user, tags, nil,
"")
- queue, err = tr.placeApplication(appInfo, queueFunc)
+ queue, aclCheck, err = tr.placeApplication(appInfo, queueFunc)
if queue != "root.testqueue" || err != nil {
t.Errorf("tag rule failed to place queue in correct queue '%s',
err %v", queue, err)
}
+ assert.Check(t, aclCheck, "acls should be checked")
// tag queue that does not exists
tags = map[string]string{"label1": "unknown"}
appInfo = newApplication("app1", "default", "ignored", user, tags, nil,
"")
- queue, err = tr.placeApplication(appInfo, queueFunc)
+ queue, aclCheck, err = tr.placeApplication(appInfo, queueFunc)
if queue != "" || err != nil {
t.Errorf("tag rule placed in queue that does not exists '%s',
err %v", queue, err)
}
+ assert.Check(t, aclCheck, "acls should be checked")
// tag queue fully qualified
tags = map[string]string{"label1": "root.testparent.testchild"}
appInfo = newApplication("app1", "default", "ignored", user, tags, nil,
"")
- queue, err = tr.placeApplication(appInfo, queueFunc)
+ queue, aclCheck, err = tr.placeApplication(appInfo, queueFunc)
if queue != "root.testparent.testchild" || err != nil {
t.Errorf("tag rule did fail with qualified queue '%s', error
%v", queue, err)
}
+ assert.Check(t, aclCheck, "acls should be checked")
+
+ // tag queue references recovery
+ tags = map[string]string{"label1": common.RecoveryQueueFull}
+ appInfo = newApplication("app1", "default", "ignored", user, tags, nil,
"")
+ queue, aclCheck, err = tr.placeApplication(appInfo, queueFunc)
+ if queue != "" || err != nil {
+ t.Errorf("tag rule failed with explicit recovery queue: queue
'%s', error %v", queue, err)
+ }
+ assert.Check(t, aclCheck, "acls should be checked")
// trying to place in a child using a parent
conf = configs.PlacementRule{
@@ -133,16 +148,18 @@ partitions:
}
tags = map[string]string{"label1": "testchild"}
appInfo = newApplication("app1", "default", "ignored", user, tags, nil,
"")
- queue, err = tr.placeApplication(appInfo, queueFunc)
+ queue, aclCheck, err = tr.placeApplication(appInfo, queueFunc)
if queue != "" || err != nil {
t.Errorf("tag rule with parent queue should have failed value
not set '%s', error %v", queue, err)
}
+ assert.Check(t, aclCheck, "acls should be checked")
tags = map[string]string{"label1": "testchild", "label2": "testparent"}
appInfo = newApplication("app1", "default", "ignored", user, tags, nil,
"")
- queue, err = tr.placeApplication(appInfo, queueFunc)
+ queue, aclCheck, err = tr.placeApplication(appInfo, queueFunc)
if queue != "root.testparent.testchild" || err != nil {
t.Errorf("tag rule with parent queue incorrect queue '%s',
error %v", queue, err)
}
+ assert.Check(t, aclCheck, "acls should be checked")
}
func TestTagRuleParent(t *testing.T) {
@@ -173,10 +190,12 @@ func TestTagRuleParent(t *testing.T) {
tags := map[string]string{"label1": "testchild", "label2": "testparent"}
appInfo := newApplication("app1", "default", "unknown", user, tags,
nil, "")
var queue string
- queue, err = ur.placeApplication(appInfo, queueFunc)
+ var aclCheck bool
+ queue, aclCheck, err = ur.placeApplication(appInfo, queueFunc)
if queue != "" || err != nil {
t.Errorf("tag rule placed app in incorrect queue '%s', err %v",
queue, err)
}
+ assert.Check(t, aclCheck, "acls should be checked")
// trying to place in a child using a non creatable parent
conf = configs.PlacementRule{
@@ -196,10 +215,11 @@ func TestTagRuleParent(t *testing.T) {
tags = map[string]string{"label1": "testchild", "label2":
"testparentnew"}
appInfo = newApplication("app1", "default", "unknown", user, tags, nil,
"")
- queue, err = ur.placeApplication(appInfo, queueFunc)
+ queue, aclCheck, err = ur.placeApplication(appInfo, queueFunc)
if queue != "" || err != nil {
t.Errorf("tag rule placed app in incorrect queue '%s', err %v",
queue, err)
}
+ assert.Check(t, aclCheck, "acls should be checked")
// trying to place in a child using a creatable parent
conf = configs.PlacementRule{
@@ -216,10 +236,11 @@ func TestTagRuleParent(t *testing.T) {
if err != nil || ur == nil {
t.Errorf("tag rule create failed with queue name, err %v", err)
}
- queue, err = ur.placeApplication(appInfo, queueFunc)
+ queue, aclCheck, err = ur.placeApplication(appInfo, queueFunc)
if queue != nameParentChild || err != nil {
t.Errorf("user rule with non existing parent queue should
create '%s', error %v", queue, err)
}
+ assert.Check(t, aclCheck, "acls should be checked")
// trying to place in a child using a parent which is defined as a leaf
conf = configs.PlacementRule{
@@ -237,8 +258,9 @@ func TestTagRuleParent(t *testing.T) {
}
appInfo = newApplication("app1", "default", "unknown", user, tags, nil,
"")
- queue, err = ur.placeApplication(appInfo, queueFunc)
+ queue, aclCheck, err = ur.placeApplication(appInfo, queueFunc)
if queue != "" || err == nil {
t.Errorf("tag rule placed app in incorrect queue '%s', err %v",
queue, err)
}
+ assert.Check(t, aclCheck, "acls should be checked")
}
diff --git a/pkg/scheduler/placement/testrule.go
b/pkg/scheduler/placement/testrule.go
index c94bfe87..7554d42d 100644
--- a/pkg/scheduler/placement/testrule.go
+++ b/pkg/scheduler/placement/testrule.go
@@ -48,12 +48,12 @@ func (tr *testRule) initialise(conf configs.PlacementRule)
error {
}
// Simple test rule that just checks the app passed in and returns fixed queue
names.
-func (tr *testRule) placeApplication(app *objects.Application, queueFn
func(string) *objects.Queue) (string, error) {
+func (tr *testRule) placeApplication(app *objects.Application, queueFn
func(string) *objects.Queue) (string, bool, error) {
if app == nil {
- return "", fmt.Errorf("nil app passed in")
+ return "", true, fmt.Errorf("nil app passed in")
}
if queuePath := app.GetQueuePath(); queuePath != "" {
- return replaceDot(queuePath), nil
+ return replaceDot(queuePath), true, nil
}
- return "test", nil
+ return types.Test, true, nil
}
diff --git a/pkg/scheduler/placement/types/types.go
b/pkg/scheduler/placement/types/types.go
index 66149310..6762b987 100644
--- a/pkg/scheduler/placement/types/types.go
+++ b/pkg/scheduler/placement/types/types.go
@@ -24,4 +24,5 @@ const (
Provided = "provided"
Tag = "tag"
Test = "test"
+ Recovery = "recovery"
)
diff --git a/pkg/scheduler/placement/user_rule.go
b/pkg/scheduler/placement/user_rule.go
index 9ecb9eae..f7113cf2 100644
--- a/pkg/scheduler/placement/user_rule.go
+++ b/pkg/scheduler/placement/user_rule.go
@@ -49,27 +49,28 @@ func (ur *userRule) initialise(conf configs.PlacementRule)
error {
return err
}
-func (ur *userRule) placeApplication(app *objects.Application, queueFn
func(string) *objects.Queue) (string, error) {
+func (ur *userRule) placeApplication(app *objects.Application, queueFn
func(string) *objects.Queue) (string, bool, error) {
// before anything run the filter
userName := app.GetUser().User
if !ur.filter.allowUser(app.GetUser()) {
log.Log(log.Config).Debug("User rule filtered",
zap.String("application", app.ApplicationID),
zap.Any("user", app.GetUser()))
- return "", nil
+ return "", true, nil
}
var parentName string
+ var aclCheck = true
var err error
// run the parent rule if set
if ur.parent != nil {
- parentName, err = ur.parent.placeApplication(app, queueFn)
+ parentName, aclCheck, err = ur.parent.placeApplication(app,
queueFn)
// failed parent rule, fail this rule
if err != nil {
- return "", err
+ return "", aclCheck, err
}
// rule did not match: this could be filter or create flag
related
if parentName == "" {
- return "", nil
+ return "", aclCheck, nil
}
// check if this is a parent queue and qualify it
if !strings.HasPrefix(parentName,
configs.RootQueue+configs.DOT) {
@@ -78,7 +79,7 @@ func (ur *userRule) placeApplication(app
*objects.Application, queueFn func(stri
// if the parent queue exists it cannot be a leaf
parentQueue := queueFn(parentName)
if parentQueue != nil && parentQueue.IsLeafQueue() {
- return "", fmt.Errorf("parent rule returned a leaf
queue: %s", parentName)
+ return "", aclCheck, fmt.Errorf("parent rule returned a
leaf queue: %s", parentName)
}
}
// the parent is set from the rule otherwise set it to the root
@@ -93,10 +94,10 @@ func (ur *userRule) placeApplication(app
*objects.Application, queueFn func(stri
queue := queueFn(queueName)
// if we cannot create the queue it must exist, rule does not match
otherwise
if !ur.create && queue == nil {
- return "", nil
+ return "", aclCheck, nil
}
log.Log(log.Config).Info("User rule application placed",
zap.String("application", app.ApplicationID),
zap.String("queue", queueName))
- return queueName, nil
+ return queueName, aclCheck, nil
}
diff --git a/pkg/scheduler/placement/user_rule_test.go
b/pkg/scheduler/placement/user_rule_test.go
index 9e7126e3..c38a89ff 100644
--- a/pkg/scheduler/placement/user_rule_test.go
+++ b/pkg/scheduler/placement/user_rule_test.go
@@ -59,30 +59,34 @@ partitions:
t.Errorf("user rule create failed, err %v", err)
}
var queue string
- queue, err = ur.placeApplication(appInfo, queueFunc)
+ var aclCheck bool
+ queue, aclCheck, err = ur.placeApplication(appInfo, queueFunc)
if queue != "root.testchild" || err != nil {
t.Errorf("user rule failed to place queue in correct queue
'%s', err %v", queue, err)
}
+ assert.Check(t, aclCheck, "acls should be checked")
// trying to place in a parent queue should fail on queue create not in
the rule
user = security.UserGroup{
User: "testparent",
Groups: []string{},
}
appInfo = newApplication("app1", "default", "ignored", user, tags, nil,
"")
- queue, err = ur.placeApplication(appInfo, queueFunc)
+ queue, aclCheck, err = ur.placeApplication(appInfo, queueFunc)
if queue != "root.testparent" || err != nil {
t.Errorf("user rule failed with parent queue '%s', error %v",
queue, err)
}
+ assert.Check(t, aclCheck, "acls should be checked")
user = security.UserGroup{
User: "test.user",
Groups: []string{},
}
appInfo = newApplication("app1", "default", "ignored", user, tags, nil,
"")
- queue, err = ur.placeApplication(appInfo, queueFunc)
+ queue, aclCheck, err = ur.placeApplication(appInfo, queueFunc)
if queue == "" || err != nil {
t.Errorf("user rule with dotted user should not have failed
'%s', error %v", queue, err)
}
+ assert.Check(t, aclCheck, "acls should be checked")
// user queue that exists directly in hierarchy
conf = configs.PlacementRule{
@@ -101,10 +105,11 @@ partitions:
if err != nil || ur == nil {
t.Errorf("user rule create failed with queue name, err %v", err)
}
- queue, err = ur.placeApplication(appInfo, queueFunc)
+ queue, aclCheck, err = ur.placeApplication(appInfo, queueFunc)
if queue != "root.testparent.testchild" || err != nil {
t.Errorf("user rule failed to place queue in correct queue
'%s', err %v", queue, err)
}
+ assert.Check(t, aclCheck, "acls should be checked")
// user queue that does not exists
user = security.UserGroup{
@@ -121,10 +126,11 @@ partitions:
if err != nil || ur == nil {
t.Errorf("user rule create failed with queue name, err %v", err)
}
- queue, err = ur.placeApplication(appInfo, queueFunc)
+ queue, aclCheck, err = ur.placeApplication(appInfo, queueFunc)
if queue != "root.unknown" || err != nil {
t.Errorf("user rule placed in to be created queue with create
false '%s', err %v", queue, err)
}
+ assert.Check(t, aclCheck, "acls should be checked")
}
func TestUserRuleParent(t *testing.T) {
@@ -154,10 +160,12 @@ func TestUserRuleParent(t *testing.T) {
appInfo := newApplication("app1", "default", "unknown", user, tags,
nil, "")
var queue string
- queue, err = ur.placeApplication(appInfo, queueFunc)
+ var aclCheck bool
+ queue, aclCheck, err = ur.placeApplication(appInfo, queueFunc)
if queue != "" || err != nil {
t.Errorf("user rule placed app in incorrect queue '%s', err
%v", queue, err)
}
+ assert.Check(t, aclCheck, "acls should be checked")
// trying to place in a child using a non creatable parent
conf = configs.PlacementRule{
@@ -175,10 +183,11 @@ func TestUserRuleParent(t *testing.T) {
}
appInfo = newApplication("app1", "default", "unknown", user, tags, nil,
"")
- queue, err = ur.placeApplication(appInfo, queueFunc)
+ queue, aclCheck, err = ur.placeApplication(appInfo, queueFunc)
if queue != "" || err != nil {
t.Errorf("user rule placed app in incorrect queue '%s', err
%v", queue, err)
}
+ assert.Check(t, aclCheck, "acls should be checked")
// trying to place in a child using a creatable parent
conf = configs.PlacementRule{
@@ -194,10 +203,11 @@ func TestUserRuleParent(t *testing.T) {
if err != nil || ur == nil {
t.Errorf("user rule create failed with queue name, err %v", err)
}
- queue, err = ur.placeApplication(appInfo, queueFunc)
+ queue, aclCheck, err = ur.placeApplication(appInfo, queueFunc)
if queue != nameParentChild || err != nil {
t.Errorf("user rule with non existing parent queue should
create '%s', error %v", queue, err)
}
+ assert.Check(t, aclCheck, "acls should be checked")
// trying to place in a child using a parent which is defined as a leaf
conf = configs.PlacementRule{
@@ -214,8 +224,9 @@ func TestUserRuleParent(t *testing.T) {
}
appInfo = newApplication("app1", "default", "unknown", user, tags, nil,
"")
- queue, err = ur.placeApplication(appInfo, queueFunc)
+ queue, aclCheck, err = ur.placeApplication(appInfo, queueFunc)
if queue != "" || err == nil {
t.Errorf("user rule placed app in incorrect queue '%s', err
%v", queue, err)
}
+ assert.Check(t, aclCheck, "acls should be checked")
}
diff --git a/pkg/scheduler/utilities_test.go b/pkg/scheduler/utilities_test.go
index e4eb9e97..80b44a08 100644
--- a/pkg/scheduler/utilities_test.go
+++ b/pkg/scheduler/utilities_test.go
@@ -422,6 +422,10 @@ func newPlacementPartition() (*PartitionContext, error) {
}
func newApplication(appID, partition, queueName string) *objects.Application {
+ return newApplicationTags(appID, partition, queueName, nil)
+}
+
+func newApplicationTags(appID, partition, queueName string, tags
map[string]string) *objects.Application {
user := security.UserGroup{
User: "testuser",
Groups: []string{"testgroup"},
@@ -434,6 +438,7 @@ func newApplicationWithUser(appID, partition, queueName
string, user security.Us
ApplicationID: appID,
QueueName: queueName,
PartitionName: partition,
+ Tags: tags,
}
return objects.NewApplication(siApp, user, nil, rmID)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]