This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git
The following commit(s) were added to refs/heads/master by this push:
new 78862645 [YUNIKORN-2547] Queue: clean up logic when adding application
(#838)
78862645 is described below
commit 78862645c7e3f790b4bf2543865ba5a85c115163
Author: Peter Bacsko <[email protected]>
AuthorDate: Mon Apr 22 21:41:20 2024 +0800
[YUNIKORN-2547] Queue: clean up logic when adding application (#838)
Closes: #838
Signed-off-by: Chia-Ping Tsai <[email protected]>
---
pkg/scheduler/objects/application.go | 33 +++++++-
pkg/scheduler/objects/application_test.go | 85 +++++++++++++++++++
pkg/scheduler/objects/queue.go | 76 +++--------------
pkg/scheduler/objects/queue_test.go | 136 +-----------------------------
pkg/scheduler/partition.go | 14 +--
pkg/scheduler/partition_test.go | 51 +++++++++--
6 files changed, 186 insertions(+), 209 deletions(-)
diff --git a/pkg/scheduler/objects/application.go
b/pkg/scheduler/objects/application.go
index e6940680..cba48664 100644
--- a/pkg/scheduler/objects/application.go
+++ b/pkg/scheduler/objects/application.go
@@ -40,6 +40,7 @@ import (
"github.com/apache/yunikorn-core/pkg/metrics"
"github.com/apache/yunikorn-core/pkg/rmproxy/rmevent"
"github.com/apache/yunikorn-core/pkg/scheduler/ugm"
+ siCommon "github.com/apache/yunikorn-scheduler-interface/lib/go/common"
"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
)
@@ -1525,7 +1526,6 @@ func (sa *Application) SetQueue(queue *Queue) {
defer sa.Unlock()
sa.queuePath = queue.QueuePath
sa.queue = queue
- sa.queue.UpdateApplicationPriority(sa.ApplicationID, sa.askMaxPriority)
}
// remove the leaf queue the application runs in, used when completing the app
@@ -2115,3 +2115,34 @@ func (sa *Application)
updateRunnableStatus(runnableInQueue, runnableByUserLimit
}
sa.runnableByUserLimit = runnableByUserLimit
}
+
+// GetGuaranteedResource returns the guaranteed resource that is set in the
application tags
+func (sa *Application) GetGuaranteedResource() *resources.Resource {
+ return
sa.getResourceFromTags(siCommon.AppTagNamespaceResourceGuaranteed)
+}
+
+// GetMaxResource returns the max resource that is set in the application tags
+func (sa *Application) GetMaxResource() *resources.Resource {
+ return sa.getResourceFromTags(siCommon.AppTagNamespaceResourceQuota)
+}
+
+func (sa *Application) getResourceFromTags(tag string) *resources.Resource {
+ value := sa.GetTag(tag)
+ if value == "" {
+ return nil
+ }
+
+ resource, err := resources.NewResourceFromString(value)
+ if err != nil {
+ log.Log(log.SchedQueue).Warn("application resource conversion
failure",
+ zap.String("tag", tag),
+ zap.String("json string", value),
+ zap.Error(err))
+ } else if !resources.StrictlyGreaterThanZero(resource) {
+ log.Log(log.SchedQueue).Warn("resource quantities should be
greater than zero",
+ zap.Stringer("resource", resource))
+ resource = nil
+ }
+
+ return resource
+}
diff --git a/pkg/scheduler/objects/application_test.go
b/pkg/scheduler/objects/application_test.go
index 2300d729..75e6da5a 100644
--- a/pkg/scheduler/objects/application_test.go
+++ b/pkg/scheduler/objects/application_test.go
@@ -105,6 +105,46 @@ func TestNewApplication(t *testing.T) {
assert.Assert(t, app.IsNew(), "new application must be in new state")
assert.Equal(t, app.execTimeout, defaultPlaceholderTimeout, "no timeout
passed in should be modified default")
assert.Assert(t, resources.Equals(app.placeholderAsk, res),
"placeholder ask not set as expected")
+
+ // valid tags
+ siApp = &si.AddApplicationRequest{}
+ siApp.Tags = map[string]string{
+ siCommon.AppTagNamespaceResourceQuota:
"{\"resources\":{\"validMaxRes\":{\"value\":11}}}",
+ siCommon.AppTagNamespaceResourceGuaranteed:
"{\"resources\":{\"validGuaranteed\":{\"value\":22}}}",
+ }
+ app = NewApplication(siApp, user, nil, "")
+ guaranteed := app.GetGuaranteedResource()
+ maxResource := app.GetMaxResource()
+ assert.Assert(t, guaranteed != nil, "guaranteed resource has not been
set")
+ assert.Equal(t, 1, len(guaranteed.Resources), "more than one resource
has been set")
+ assert.Equal(t, resources.Quantity(22),
guaranteed.Resources["validGuaranteed"])
+ assert.Assert(t, maxResource != nil, "maximum resource has not been
set")
+ assert.Equal(t, 1, len(maxResource.Resources), "more than one resource
has been set")
+ assert.Equal(t, resources.Quantity(11),
maxResource.Resources["validMaxRes"], "maximum resource is incorrect")
+
+ // invalid tags
+ siApp = &si.AddApplicationRequest{}
+ siApp.Tags = map[string]string{
+ siCommon.AppTagNamespaceResourceQuota: "{xxxxxx}",
+ siCommon.AppTagNamespaceResourceGuaranteed: "{yyyyy}",
+ }
+ app = NewApplication(siApp, user, nil, "")
+ guaranteed = app.GetGuaranteedResource()
+ maxResource = app.GetMaxResource()
+ assert.Assert(t, guaranteed == nil, "guaranteed resource should have
not been set")
+ assert.Assert(t, maxResource == nil, "maximum resource should have not
been set")
+
+ // negative values
+ siApp = &si.AddApplicationRequest{}
+ siApp.Tags = map[string]string{
+ siCommon.AppTagNamespaceResourceQuota:
"{\"resources\":{\"negativeMax\":{\"value\":-11}}}",
+ siCommon.AppTagNamespaceResourceGuaranteed:
"{\"resources\":{\"negativeGuaranteed\":{\"value\":-22}}}",
+ }
+ app = NewApplication(siApp, user, nil, "")
+ guaranteed = app.GetGuaranteedResource()
+ maxResource = app.GetMaxResource()
+ assert.Assert(t, guaranteed == nil, "guaranteed resource should have
not been set")
+ assert.Assert(t, maxResource == nil, "maximum resource should have not
been set")
}
// test basic reservations
@@ -2645,6 +2685,51 @@ func TestUpdateRunnableStatus(t *testing.T) {
assert.Equal(t, si.EventRecord_APP_CANNOTRUN_QUOTA,
eventSystem.Events[1].EventChangeDetail)
}
+func TestGetMaxResourceFromTag(t *testing.T) {
+ app := newApplication(appID0, "default", "root.unknown")
+ testGetResourceFromTag(t, siCommon.AppTagNamespaceResourceQuota,
app.tags, app.GetMaxResource)
+}
+
+func TestGuaranteedResourceFromTag(t *testing.T) {
+ app := newApplication(appID0, "default", "root.unknown")
+ testGetResourceFromTag(t, siCommon.AppTagNamespaceResourceGuaranteed,
app.tags, app.GetGuaranteedResource)
+}
+
+func testGetResourceFromTag(t *testing.T, tagName string, tags
map[string]string, getResource func() *resources.Resource) {
+ assert.Equal(t, 0, len(tags), "tags are not empty")
+
+ // no value for tag
+ res := getResource()
+ assert.Assert(t, res == nil, "unexpected resource")
+
+ // empty value
+ tags[tagName] = ""
+ res = getResource()
+ assert.Assert(t, res == nil, "unexpected resource")
+
+ // valid value
+ tags[tagName] = "{\"resources\":{\"vcore\":{\"value\":111}}}"
+ res = getResource()
+ assert.Assert(t, res != nil)
+ assert.Equal(t, 1, len(res.Resources))
+ assert.Equal(t, resources.Quantity(111), res.Resources["vcore"])
+
+ // zero
+ tags[tagName] = "{\"resources\":{\"vcore\":{\"value\":0}}}"
+ res = getResource()
+ assert.Assert(t, res == nil)
+
+ // negative
+ tags[tagName] = "{\"resources\":{\"vcore\":{\"value\":-12}}}"
+ res = getResource()
+ assert.Assert(t, res == nil, "unexpected resource")
+
+ // invalid value
+ tags[tagName] = "{xyz}"
+ res = getResource()
+ assert.Assert(t, res == nil, "unexpected resource")
+}
+
func (sa *Application) addPlaceholderDataWithLocking(ask *AllocationAsk) {
sa.Lock()
defer sa.Unlock()
diff --git a/pkg/scheduler/objects/queue.go b/pkg/scheduler/objects/queue.go
index c231f8cf..5c5bc004 100644
--- a/pkg/scheduler/objects/queue.go
+++ b/pkg/scheduler/objects/queue.go
@@ -41,7 +41,6 @@ import (
"github.com/apache/yunikorn-core/pkg/scheduler/policies"
"github.com/apache/yunikorn-core/pkg/scheduler/ugm"
"github.com/apache/yunikorn-core/pkg/webservice/dao"
- siCommon "github.com/apache/yunikorn-scheduler-interface/lib/go/common"
)
var (
@@ -357,7 +356,7 @@ func (sq *Queue) applyConf(conf configs.QueueConfig) error {
// Load the max & guaranteed resources for all but the root queue
if sq.Name != configs.RootQueue {
- if err = sq.setResources(conf.Resources); err != nil {
+ if err = sq.setResourcesFromConf(conf.Resources); err != nil {
return err
}
}
@@ -366,8 +365,8 @@ func (sq *Queue) applyConf(conf configs.QueueConfig) error {
return nil
}
-// setResources sets the maxResource and guaranteedResource of the queue from
the config.
-func (sq *Queue) setResources(resource configs.Resources) error {
+// setResourcesFromConf sets the maxResource and guaranteedResource of the
queue from the config.
+func (sq *Queue) setResourcesFromConf(resource configs.Resources) error {
maxResource, err := resources.NewResourceFromConf(resource.Max)
if err != nil {
log.Log(log.SchedQueue).Error("parsing failed on max resources
this should not happen",
@@ -385,6 +384,11 @@ func (sq *Queue) setResources(resource configs.Resources)
error {
return err
}
+ sq.setResources(guaranteedResource, maxResource)
+ return nil
+}
+
+func (sq *Queue) setResources(guaranteedResource, maxResource
*resources.Resource) {
switch {
case resources.StrictlyGreaterThanZero(maxResource):
log.Log(log.SchedQueue).Debug("setting max resources",
@@ -436,7 +440,12 @@ func (sq *Queue) setResources(resource configs.Resources)
error {
log.Log(log.SchedQueue).Debug("guaranteed resources setting
ignored: cannot set zero guaranteed resources",
zap.String("queue", sq.QueuePath))
}
- return nil
+}
+
+func (sq *Queue) SetResources(guaranteedResource, maxResource
*resources.Resource) {
+ sq.Lock()
+ defer sq.Unlock()
+ sq.setResources(guaranteedResource, maxResource)
}
// setTemplate sets the template on the queue based on the config.
@@ -737,63 +746,6 @@ func (sq *Queue) AddApplication(app *Application) {
appID := app.ApplicationID
sq.applications[appID] = app
sq.queueEvents.sendNewApplicationEvent(sq.QueuePath, appID)
- // YUNIKORN-199: update the quota from the namespace
- // get the tag with the quota
- quota := app.GetTag(siCommon.AppTagNamespaceResourceQuota)
- // get the tag with the guaranteed resource
- guaranteed := app.GetTag(siCommon.AppTagNamespaceResourceGuaranteed)
- if quota == "" && guaranteed == "" {
- return
- }
-
- var quotaRes, guaranteedRes *resources.Resource
- var quotaErr, guaranteedErr error
-
- // need to set a quota: convert json string to resource
- if quota != "" {
- quotaRes, quotaErr = resources.NewResourceFromString(quota)
- if quotaErr != nil {
- log.Log(log.SchedQueue).Warn("application resource
quota conversion failure",
- zap.String("json quota string", quota),
- zap.Error(quotaErr))
- } else if !resources.StrictlyGreaterThanZero(quotaRes) {
- log.Log(log.SchedQueue).Warn("Max resource quantities
should be greater than zero: cannot set queue max resource",
- zap.Stringer("maxResource", quotaRes))
- quotaRes = nil // Skip setting quota if it has a value
<= 0
- }
- }
-
- // need to set guaranteed resource: convert json string to resource
- if guaranteed != "" {
- guaranteedRes, guaranteedErr =
resources.NewResourceFromString(guaranteed)
- if guaranteedErr != nil {
- log.Log(log.SchedQueue).Warn("application guaranteed
resource conversion failure",
- zap.String("json guaranteed string",
guaranteed),
- zap.Error(guaranteedErr))
- if quotaErr != nil {
- return
- }
- } else if !resources.StrictlyGreaterThanZero(guaranteedRes) {
- log.Log(log.SchedQueue).Warn("Guaranteed resource
quantities should be greater than zero: cannot set queue guaranteed resource",
- zap.Stringer("guaranteedResource",
guaranteedRes))
- guaranteedRes = nil // Skip setting guaranteed resource
if it has a value <= 0
- }
- }
-
- // set the quota
- if sq.isManaged {
- log.Log(log.SchedQueue).Warn("Trying to set max resources on a
queue that is not an unmanaged leaf",
- zap.String("queueName", sq.QueuePath))
- return
- }
-
- if quotaRes != nil {
- sq.maxResource = quotaRes
- }
-
- if guaranteedRes != nil {
- sq.guaranteedResource = guaranteedRes
- }
}
// RemoveApplication removes the app from the list of tracked applications.
Make sure that the app
diff --git a/pkg/scheduler/objects/queue_test.go
b/pkg/scheduler/objects/queue_test.go
index 0ff14212..0cd30393 100644
--- a/pkg/scheduler/objects/queue_test.go
+++ b/pkg/scheduler/objects/queue_test.go
@@ -415,136 +415,6 @@ func TestAddApplication(t *testing.T) {
assert.Equal(t, len(leaf.applications), 1, "Application was not
replaced in the queue as expected")
}
-func TestAddApplicationWithTag(t *testing.T) {
- // create the root
- root, err := createRootQueue(nil)
- assert.NilError(t, err, "queue create failed")
- // only need to test leaf queues as we will never add an app to a parent
- var leaf, leafUn *Queue
- leaf, err = createManagedQueue(root, "leaf-man", false, nil)
- assert.NilError(t, err, "failed to create managed leaf queue")
- leafUn, err = createDynamicQueue(root, "leaf-unman", false)
- assert.NilError(t, err, "failed to create Dynamic leaf queue")
- app := newApplication(appID1, "default", "root.leaf-man")
-
- // adding the app to managed/Dynamic queue must not update queue
settings, works
- leaf.AddApplication(app)
- assert.Equal(t, len(leaf.applications), 1, "Application was not added
to the managed queue as expected")
- if leaf.GetMaxResource() != nil {
- t.Errorf("Max resources should not be set on managed queue got:
%s", leaf.GetMaxResource().String())
- }
- app = newApplication("app-2", "default", "root.leaf-un")
- leafUn.AddApplication(app)
- assert.Equal(t, len(leafUn.applications), 1, "Application was not added
to the Dynamic queue as expected")
- if leafUn.GetMaxResource() != nil {
- t.Errorf("Max resources should not be set on Dynamic queue got:
%s", leafUn.GetMaxResource().String())
- }
-
- maxRes := resources.NewResourceFromMap(
- map[string]resources.Quantity{
- "first": 10,
- })
- guaranteedRes := resources.NewResourceFromMap(
- map[string]resources.Quantity{
- "first": 5,
- })
- tags := make(map[string]string)
- tags[siCommon.AppTagNamespaceResourceQuota] =
"{\"resources\":{\"first\":{\"value\":10}}}"
- tags[siCommon.AppTagNamespaceResourceGuaranteed] =
"{\"resources\":{\"first\":{\"value\":5}}}"
- // add apps again now with the tag set
- app = newApplicationWithTags("app-3", "default", "root.leaf-man", tags)
- leaf.AddApplication(app)
- assert.Equal(t, len(leaf.applications), 2, "Application was not added
to the managed queue as expected")
- if leaf.GetMaxResource() != nil {
- t.Errorf("Max resources should not be set on managed queue got:
%s", leaf.GetMaxResource().String())
- }
- if leaf.GetGuaranteedResource() != nil {
- t.Errorf("Guaranteed resources should not be set on managed
queue got: %s", leaf.GetGuaranteedResource().String())
- }
-
- app = newApplicationWithTags("app-4", "default", "root.leaf-un", tags)
- leafUn.AddApplication(app)
- assert.Equal(t, len(leafUn.applications), 2, "Application was not added
to the Dynamic queue as expected")
- if !resources.Equals(leafUn.GetMaxResource(), maxRes) {
- t.Errorf("Max resources not set as expected: %s got: %v",
maxRes.String(), leafUn.GetMaxResource())
- }
- if !resources.Equals(leafUn.GetGuaranteedResource(), guaranteedRes) {
- t.Errorf("Guaranteed resources not set as expected: %s got:
%v", guaranteedRes.String(), leafUn.GetGuaranteedResource())
- }
-
- // set max to illegal limit (0 value), but guarantee not 0
- tags[siCommon.AppTagNamespaceResourceQuota] = ZeroResource
-
- app = newApplicationWithTags("app-5", "default", "root.leaf-un", tags)
- leafUn.AddApplication(app)
- assert.Equal(t, len(leafUn.applications), 3, "Application was not added
to the Dynamic queue as expected")
- if !resources.Equals(leafUn.GetMaxResource(), maxRes) {
- t.Errorf("Max resources not set as expected: %s got: %v",
maxRes.String(), leafUn.GetMaxResource())
- }
- if !resources.Equals(leafUn.GetGuaranteedResource(), guaranteedRes) {
- t.Errorf("Guaranteed resources not set as expected: %s got:
%v", guaranteedRes.String(), leafUn.GetGuaranteedResource())
- }
-
- // set guaranteed resource to illegal limit (0 value), but max resource
not 0
- tags[siCommon.AppTagNamespaceResourceGuaranteed] = ZeroResource
- tags[siCommon.AppTagNamespaceResourceQuota] =
"{\"resources\":{\"first\":{\"value\":100}}}"
-
- app = newApplicationWithTags("app-6", "default", "root.leaf-un", tags)
- leafUn.AddApplication(app)
- assert.Equal(t, len(leafUn.applications), 4, "Application was not added
to the Dynamic queue as expected")
- if !resources.Equals(leafUn.GetMaxResource(),
resources.NewResourceFromMap(
- map[string]resources.Quantity{
- "first": 100,
- })) {
- t.Errorf("Max resources not set as expected: %s got: %v",
resources.NewResourceFromMap(
- map[string]resources.Quantity{
- "first": 100,
- }).String(), leafUn.GetMaxResource())
- }
- if !resources.Equals(leafUn.GetGuaranteedResource(), guaranteedRes) {
- t.Errorf("Guaranteed resources not set as expected: %s got:
%v", guaranteedRes.String(), leafUn.GetGuaranteedResource())
- }
-
- // set guaranteed resource tag to "", but max resource not ""
- tags[siCommon.AppTagNamespaceResourceGuaranteed] = ""
- tags[siCommon.AppTagNamespaceResourceQuota] =
"{\"resources\":{\"first\":{\"value\":10}}}"
- app = newApplicationWithTags("app-7", "default", "root.leaf-un", tags)
- leafUn.AddApplication(app)
- assert.Equal(t, len(leafUn.applications), 5, "Application was not added
to the Dynamic queue as expected")
- if !resources.Equals(leafUn.GetMaxResource(), maxRes) {
- t.Errorf("Max resources not set as expected: %s got: %v",
maxRes.String(), leafUn.GetMaxResource())
- }
- if !resources.Equals(leafUn.GetGuaranteedResource(), guaranteedRes) {
- t.Errorf("Guaranteed resources not set as expected: %s got:
%v", guaranteedRes.String(), leafUn.GetGuaranteedResource())
- }
-
- // set both to "" , the max resource and guaranteed resource will not
update new value
- tags[siCommon.AppTagNamespaceResourceGuaranteed] = ""
- tags[siCommon.AppTagNamespaceResourceQuota] = ""
- app = newApplicationWithTags("app-8", "default", "root.leaf-un", tags)
- leafUn.AddApplication(app)
- assert.Equal(t, len(leafUn.applications), 6, "Application was not added
to the Dynamic queue as expected")
- if !resources.Equals(leafUn.GetMaxResource(), maxRes) {
- t.Errorf("Max resources not set as expected: %s got: %v",
maxRes.String(), leafUn.GetMaxResource())
- }
- if !resources.Equals(leafUn.GetGuaranteedResource(), guaranteedRes) {
- t.Errorf("Guaranteed resources not set as expected: %s got:
%v", guaranteedRes.String(), leafUn.GetGuaranteedResource())
- }
-
- // set both to 0 , the max resource and guaranteed resource will not
update new value
- tags[siCommon.AppTagNamespaceResourceGuaranteed] = ZeroResource
- tags[siCommon.AppTagNamespaceResourceQuota] = ZeroResource
- app = newApplicationWithTags("app-9", "default", "root.leaf-un", tags)
- leafUn.AddApplication(app)
- assert.Equal(t, len(leafUn.applications), 7, "Application was not added
to the Dynamic queue as expected")
- if !resources.Equals(leafUn.GetMaxResource(), maxRes) {
- t.Errorf("Max resources not set as expected: %s got: %v",
maxRes.String(), leafUn.GetMaxResource())
- }
- if !resources.Equals(leafUn.GetGuaranteedResource(), guaranteedRes) {
- t.Errorf("Guaranteed resources not set as expected: %s got:
%v", guaranteedRes.String(), leafUn.GetGuaranteedResource())
- }
-}
-
func TestRemoveApplication(t *testing.T) {
// create the root
root, err := createRootQueue(map[string]string{"first": "100"})
@@ -1691,7 +1561,7 @@ func TestSetResources(t *testing.T) {
maxResource := getResourceConf()
// case 0: normal case
- err = queue.setResources(configs.Resources{
+ err = queue.setResourcesFromConf(configs.Resources{
Guaranteed: guaranteedResource,
Max: maxResource,
})
@@ -1707,7 +1577,7 @@ func TestSetResources(t *testing.T) {
// case 1: empty resource would set the queue resources to 'nil' if it
has been set already
var nilResource *resources.Resource = nil
- err = queue.setResources(configs.Resources{
+ err = queue.setResourcesFromConf(configs.Resources{
Guaranteed: make(map[string]string),
Max: make(map[string]string),
})
@@ -1716,7 +1586,7 @@ func TestSetResources(t *testing.T) {
assert.DeepEqual(t, queue.maxResource, nilResource)
// case 2: zero resource won't change the queue resources as it is
'nil' already
- err = queue.setResources(configs.Resources{
+ err = queue.setResourcesFromConf(configs.Resources{
Guaranteed: getZeroResourceConf(),
Max: getZeroResourceConf(),
})
diff --git a/pkg/scheduler/partition.go b/pkg/scheduler/partition.go
index 41b20e0d..992efade 100644
--- a/pkg/scheduler/partition.go
+++ b/pkg/scheduler/partition.go
@@ -339,8 +339,13 @@ func (pc *PartitionContext) AddApplication(app
*objects.Application) error {
return fmt.Errorf("failed to find queue %s for application %s",
queueName, appID)
}
- // add the app to the queue to set the quota on the queue if needed
- queue.AddApplication(app)
+ // set resources based on tags, but only if the queue is dynamic
(unmanaged)
+ if queue.IsManaged() {
+ log.Log(log.SchedQueue).Warn("Trying to set resources on a
queue that is not an unmanaged leaf",
+ zap.String("queueName", queue.QueuePath))
+ } else {
+ queue.SetResources(app.GetGuaranteedResource(),
app.GetMaxResource())
+ }
// check only for gang request
// - make sure the taskgroup request fits in the maximum set for the
queue hierarchy
// - task groups should only be used in FIFO queues
@@ -348,21 +353,18 @@ func (pc *PartitionContext) AddApplication(app
*objects.Application) error {
if placeHolder := app.GetPlaceholderAsk();
!resources.IsZero(placeHolder) {
// check the queue sorting
if !queue.SupportTaskGroup() {
- queue.RemoveApplication(app)
return fmt.Errorf("queue %s cannot run application %s
with task group request: unsupported sort type", queueName, appID)
}
- // retrieve the max set
if maxQueue := queue.GetMaxQueueSet(); maxQueue != nil {
if !maxQueue.FitInMaxUndef(placeHolder) {
- queue.RemoveApplication(app)
return fmt.Errorf("queue %s cannot fit
application %s: task group request %s larger than max queue allocation %s",
queueName, appID, placeHolder.String(), maxQueue.String())
}
}
}
-
// all is OK update the app and add it to the partition
app.SetQueue(queue)
app.SetTerminatedCallback(pc.moveTerminatedApp)
+ queue.AddApplication(app)
pc.applications[appID] = app
return nil
diff --git a/pkg/scheduler/partition_test.go b/pkg/scheduler/partition_test.go
index 7b4d0583..e8f0e001 100644
--- a/pkg/scheduler/partition_test.go
+++ b/pkg/scheduler/partition_test.go
@@ -2807,11 +2807,20 @@ func TestAddTGApplication(t *testing.T) {
var tgRes *resources.Resource
tgRes, err = resources.NewResourceFromConf(map[string]string{"vcore":
"10"})
assert.NilError(t, err, "failed to create resource")
- app := newApplicationTG(appID1, "default", "root.limited", tgRes)
+ tags := map[string]string{
+ siCommon.AppTagNamespaceResourceGuaranteed:
"{\"resources\":{\"vcore\":{\"value\":111}}}",
+ siCommon.AppTagNamespaceResourceQuota:
"{\"resources\":{\"vcore\":{\"value\":2222}}}",
+ }
+ app := newApplicationTGTags(appID1, "default", "root.limited", tgRes,
tags)
err = partition.AddApplication(app)
if err == nil {
t.Error("app-1 should be rejected due to TG request")
}
+ queue := partition.GetQueue("root.limited")
+ assert.Assert(t, resources.Equals(queue.GetMaxResource(),
resources.NewResourceFromMap(map[string]resources.Quantity{
+ "vcore": 1000,
+ })), "max resource changed unexpectedly")
+ assert.Assert(t, queue.GetGuaranteedResource() == nil)
// add a app with TG that does fit in the queue
limit = map[string]string{"vcore": "100"}
@@ -2820,6 +2829,11 @@ func TestAddTGApplication(t *testing.T) {
err = partition.AddApplication(app)
assert.NilError(t, err, "app-1 should have been added to the partition")
assert.Equal(t, partition.getApplication(appID1), app, "partition
failed to add app incorrect app returned")
+ queue = partition.GetQueue("root.limited")
+ assert.Assert(t, resources.Equals(queue.GetMaxResource(),
resources.NewResourceFromMap(map[string]resources.Quantity{
+ "vcore": 100000,
+ })), "max resource changed unexpectedly")
+ assert.Assert(t, queue.GetGuaranteedResource() == nil)
// add a app with TG that does fit in the queue as the resource is not
limited in the queue
limit = map[string]string{"second": "100"}
@@ -2828,6 +2842,11 @@ func TestAddTGApplication(t *testing.T) {
err = partition.AddApplication(app)
assert.NilError(t, err, "app-1 should have been added to the partition")
assert.Equal(t, partition.getApplication(appID1), app, "partition
failed to add app incorrect app returned")
+ queue = partition.GetQueue("root.limited")
+ assert.Assert(t, resources.Equals(queue.GetMaxResource(),
resources.NewResourceFromMap(map[string]resources.Quantity{
+ "second": 100,
+ })), "max resource changed unexpectedly")
+ assert.Assert(t, queue.GetGuaranteedResource() == nil)
}
func TestAddTGAppDynamic(t *testing.T) {
@@ -2843,16 +2862,24 @@ func TestAddTGAppDynamic(t *testing.T) {
assert.NilError(t, err, "app-1 should have been added to the partition")
assert.Equal(t, app.GetQueuePath(), "root.unlimited", "app-1 not placed
in expected queue")
- jsonRes := "{\"resources\":{\"vcore\":{\"value\":10000}}}"
- tags = map[string]string{"taskqueue": "same",
siCommon.AppTagNamespaceResourceQuota: jsonRes}
+ jsonMaxRes := "{\"resources\":{\"vcore\":{\"value\":10000}}}"
+ tags = map[string]string{"taskqueue": "same",
siCommon.AppTagNamespaceResourceQuota: jsonMaxRes}
app = newApplicationTGTags(appID2, "default", "unknown", tgRes, tags)
err = partition.AddApplication(app)
assert.NilError(t, err, "app-2 should have been added to the partition")
assert.Equal(t, partition.getApplication(appID2), app, "partition
failed to add app incorrect app returned")
assert.Equal(t, app.GetQueuePath(), "root.same", "app-2 not placed in
expected queue")
-
- jsonRes = "{\"resources\":{\"vcore\":{\"value\":1000}}}"
- tags = map[string]string{"taskqueue": "smaller",
siCommon.AppTagNamespaceResourceQuota: jsonRes}
+ queue := partition.GetQueue("root.same")
+ assert.Assert(t, queue.GetGuaranteedResource() == nil, "guaranteed
resource should be unset")
+ maxRes := queue.GetMaxResource()
+ assert.Assert(t, maxRes != nil, "maximum resource should have been set")
+ assert.Assert(t, resources.Equals(maxRes,
resources.NewResourceFromMap(map[string]resources.Quantity{
+ "vcore": 10000,
+ })), "max resource set on the queue does not match the JSON tag")
+
+ jsonMaxRes = "{\"resources\":{\"vcore\":{\"value\":1000}}}"
+ jsonGuaranteedRes := "{\"resources\":{\"vcore\":{\"value\":111}}}"
+ tags = map[string]string{"taskqueue": "smaller",
siCommon.AppTagNamespaceResourceQuota: jsonMaxRes,
siCommon.AppTagNamespaceResourceGuaranteed: jsonGuaranteedRes}
app = newApplicationTGTags(appID3, "default", "unknown", tgRes, tags)
err = partition.AddApplication(app)
if err == nil {
@@ -2861,10 +2888,20 @@ func TestAddTGAppDynamic(t *testing.T) {
if partition.getApplication(appID3) != nil {
t.Fatal("partition added app incorrectly should have failed")
}
- queue := partition.GetQueue("root.smaller")
+ queue = partition.GetQueue("root.smaller")
if queue == nil {
t.Fatal("queue should have been added, even if app failed")
}
+ maxRes = queue.GetMaxResource()
+ assert.Assert(t, maxRes != nil, "maximum resource should have been set")
+ assert.Assert(t, resources.Equals(maxRes,
resources.NewResourceFromMap(map[string]resources.Quantity{
+ "vcore": 1000,
+ })), "max resource set on the queue does not match the JSON tag")
+ guaranteedRes := queue.GetGuaranteedResource()
+ assert.Assert(t, guaranteedRes != nil, "guaranteed resource should have
been set")
+ assert.Assert(t, resources.Equals(guaranteedRes,
resources.NewResourceFromMap(map[string]resources.Quantity{
+ "vcore": 111,
+ })), "guaranteed resource set on the queue does not match the JSON tag")
}
func TestPlaceholderSmallerThanReal(t *testing.T) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]