This is an automated email from the ASF dual-hosted git repository.

zhuqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git


The following commit(s) were added to refs/heads/master by this push:
     new e40f1dd7 [YUNIKORN-1697][core] Make namespace annotation to support 
max applications update. (#927)
e40f1dd7 is described below

commit e40f1dd78cc60515b4ef8dfb65057fbac8d0d4f1
Author: qzhu <[email protected]>
AuthorDate: Wed Aug 14 17:13:55 2024 +0800

    [YUNIKORN-1697][core] Make namespace annotation to support max applications 
update. (#927)
    
    fix golint
    Address new comments
    Fix test
    fix logic
    Increase test coverage
    Split testing
    Fix unit test
    fix
    increase coverage
    Merge remote-tracking branch 'upstream/master' into YUNIKORN-1697
    Add test coverage
    Address new comments
    Merge remote-tracking branch 'upstream/master' into YUNIKORN-1697
    Address new comments
    Merge remote-tracking branch 'upstream/master' into YUNIKORN-1697
    
    Closes: #927
    
    Signed-off-by: qzhu <[email protected]>
---
 pkg/scheduler/objects/application.go      |  18 +++++
 pkg/scheduler/objects/application_test.go | 108 +++++++++++++++++++++++++++++-
 pkg/scheduler/objects/queue.go            |  34 +++++-----
 pkg/scheduler/objects/queue_test.go       |  16 +++++
 pkg/scheduler/partition.go                |  10 ++-
 pkg/scheduler/partition_test.go           |  19 +++++-
 pkg/scheduler/utilities_test.go           |   1 +
 7 files changed, 184 insertions(+), 22 deletions(-)

diff --git a/pkg/scheduler/objects/application.go 
b/pkg/scheduler/objects/application.go
index 504b6b0d..350abf3a 100644
--- a/pkg/scheduler/objects/application.go
+++ b/pkg/scheduler/objects/application.go
@@ -22,6 +22,7 @@ import (
        "context"
        "fmt"
        "math"
+       "strconv"
        "strings"
        "sync"
        "time"
@@ -2157,6 +2158,23 @@ func (sa *Application) GetMaxResource() 
*resources.Resource {
        return sa.getResourceFromTags(siCommon.AppTagNamespaceResourceQuota)
 }
 
+// GetMaxApps returns the max apps that is set in the application tags
+func (sa *Application) GetMaxApps() uint64 {
+       return sa.getUint64Tag(siCommon.AppTagNamespaceResourceMaxApps)
+}
+
+func (sa *Application) getUint64Tag(tag string) uint64 {
+       uintValue, err := strconv.ParseUint(sa.GetTag(tag), 10, 64)
+       if err != nil {
+               log.Log(log.SchedApplication).Warn("application tag conversion 
failure",
+                       zap.String("tag", tag),
+                       zap.String("json string", sa.GetTag(tag)),
+                       zap.Error(err))
+               return 0
+       }
+       return uintValue
+}
+
 func (sa *Application) getResourceFromTags(tag string) *resources.Resource {
        value := sa.GetTag(tag)
        if value == "" {
diff --git a/pkg/scheduler/objects/application_test.go 
b/pkg/scheduler/objects/application_test.go
index 3d2008f4..9dfcdc56 100644
--- a/pkg/scheduler/objects/application_test.go
+++ b/pkg/scheduler/objects/application_test.go
@@ -108,46 +108,120 @@ func TestNewApplication(t *testing.T) {
        assert.Assert(t, app.IsNew(), "new application must be in new state")
        assert.Equal(t, app.execTimeout, defaultPlaceholderTimeout, "no timeout 
passed in should be modified default")
        assert.Assert(t, resources.Equals(app.placeholderAsk, res), 
"placeholder ask not set as expected")
+}
 
+func TestNewApplicationWithAnnotationUpdate(t *testing.T) {
+       user := security.UserGroup{
+               User:   "testuser",
+               Groups: []string{},
+       }
        // valid tags
-       siApp = &si.AddApplicationRequest{}
+       siApp := &si.AddApplicationRequest{}
        siApp.Tags = map[string]string{
                siCommon.AppTagNamespaceResourceQuota:      
"{\"resources\":{\"validMaxRes\":{\"value\":11}}}",
                siCommon.AppTagNamespaceResourceGuaranteed: 
"{\"resources\":{\"validGuaranteed\":{\"value\":22}}}",
+               siCommon.AppTagNamespaceResourceMaxApps:    "33",
        }
-       app = NewApplication(siApp, user, nil, "")
+
+       app := NewApplication(siApp, user, nil, "")
+
        guaranteed := app.GetGuaranteedResource()
        maxResource := app.GetMaxResource()
+       maxApps := app.GetMaxApps()
        assert.Assert(t, guaranteed != nil, "guaranteed resource has not been 
set")
        assert.Equal(t, 1, len(guaranteed.Resources), "more than one resource 
has been set")
        assert.Equal(t, resources.Quantity(22), 
guaranteed.Resources["validGuaranteed"])
        assert.Assert(t, maxResource != nil, "maximum resource has not been 
set")
        assert.Equal(t, 1, len(maxResource.Resources), "more than one resource 
has been set")
        assert.Equal(t, resources.Quantity(11), 
maxResource.Resources["validMaxRes"], "maximum resource is incorrect")
+       assert.Assert(t, maxApps != 0, "maximum apps has not been set or 
incorrect")
+       assert.Equal(t, uint64(33), maxApps, "maximum apps is incorrect")
+
+       // valid tags without max apps
+       siApp = &si.AddApplicationRequest{}
+       siApp.Tags = map[string]string{
+               siCommon.AppTagNamespaceResourceQuota:      
"{\"resources\":{\"validMaxRes\":{\"value\":11}}}",
+               siCommon.AppTagNamespaceResourceGuaranteed: 
"{\"resources\":{\"validGuaranteed\":{\"value\":22}}}",
+       }
+       app = NewApplication(siApp, user, nil, "")
+       guaranteed = app.GetGuaranteedResource()
+       maxResource = app.GetMaxResource()
+       maxApps = app.GetMaxApps()
+       assert.Assert(t, guaranteed != nil, "guaranteed resource has not been 
set")
+       assert.Equal(t, 1, len(guaranteed.Resources), "more than one resource 
has been set")
+       assert.Equal(t, resources.Quantity(22), 
guaranteed.Resources["validGuaranteed"])
+       assert.Assert(t, maxResource != nil, "maximum resource has not been 
set")
+       assert.Equal(t, 1, len(maxResource.Resources), "more than one resource 
has been set")
+       assert.Equal(t, resources.Quantity(11), 
maxResource.Resources["validMaxRes"], "maximum resource is incorrect")
+       assert.Assert(t, maxApps == 0, "maximum apps should have not been set")
 
        // invalid tags
        siApp = &si.AddApplicationRequest{}
        siApp.Tags = map[string]string{
                siCommon.AppTagNamespaceResourceQuota:      "{xxxxxx}",
                siCommon.AppTagNamespaceResourceGuaranteed: "{yyyyy}",
+               siCommon.AppTagNamespaceResourceMaxApps:    "zzzzz",
        }
        app = NewApplication(siApp, user, nil, "")
        guaranteed = app.GetGuaranteedResource()
        maxResource = app.GetMaxResource()
+       maxApps = app.GetMaxApps()
        assert.Assert(t, guaranteed == nil, "guaranteed resource should have 
not been set")
        assert.Assert(t, maxResource == nil, "maximum resource should have not 
been set")
+       assert.Assert(t, maxApps == 0, "maximum apps should have not been set 
or incorrect")
 
        // negative values
        siApp = &si.AddApplicationRequest{}
        siApp.Tags = map[string]string{
                siCommon.AppTagNamespaceResourceQuota:      
"{\"resources\":{\"negativeMax\":{\"value\":-11}}}",
                siCommon.AppTagNamespaceResourceGuaranteed: 
"{\"resources\":{\"negativeGuaranteed\":{\"value\":-22}}}",
+               siCommon.AppTagNamespaceResourceMaxApps:    "-33",
        }
        app = NewApplication(siApp, user, nil, "")
        guaranteed = app.GetGuaranteedResource()
        maxResource = app.GetMaxResource()
+       maxApps = app.GetMaxApps()
        assert.Assert(t, guaranteed == nil, "guaranteed resource should have 
not been set")
        assert.Assert(t, maxResource == nil, "maximum resource should have not 
been set")
+       assert.Assert(t, maxApps == 0, "maximum apps should have not been set 
or incorrect")
+
+       // valid max apps
+       siApp = &si.AddApplicationRequest{}
+       siApp.Tags = map[string]string{
+               siCommon.AppTagNamespaceResourceMaxApps: "33",
+       }
+       app = NewApplication(siApp, user, nil, "")
+       maxApps = app.GetMaxApps()
+       guaranteed = app.GetGuaranteedResource()
+       maxResource = app.GetMaxResource()
+       assert.Assert(t, guaranteed == nil, "guaranteed resource should have 
not been set")
+       assert.Assert(t, maxResource == nil, "maximum resource should have not 
been set")
+       assert.Assert(t, maxApps != 0, "maximum apps has not been set or 
incorrect")
+       assert.Equal(t, uint64(33), maxApps, "maximum apps is incorrect")
+
+       // invalid max apps
+       siApp = &si.AddApplicationRequest{}
+       siApp.Tags = map[string]string{
+               siCommon.AppTagNamespaceResourceMaxApps: "zzzzz",
+       }
+       app = NewApplication(siApp, user, nil, "")
+       maxApps = app.GetMaxApps()
+       maxResource = app.GetMaxResource()
+       assert.Assert(t, guaranteed == nil, "guaranteed resource should have 
not been set")
+       assert.Assert(t, maxResource == nil, "maximum resource should have not 
been set")
+       assert.Assert(t, maxApps == 0, "maximum apps should have not been set 
or incorrect")
+
+       // negative max apps
+       siApp = &si.AddApplicationRequest{}
+       siApp.Tags = map[string]string{
+               siCommon.AppTagNamespaceResourceMaxApps: "-33",
+       }
+       app = NewApplication(siApp, user, nil, "")
+       maxApps = app.GetMaxApps()
+       maxResource = app.GetMaxResource()
+       assert.Assert(t, guaranteed == nil, "guaranteed resource should have 
not been set")
+       assert.Assert(t, maxResource == nil, "maximum resource should have not 
been set")
+       assert.Assert(t, maxApps == 0, "maximum apps should have not been set 
or incorrect")
 }
 
 // test basic reservations
@@ -2799,3 +2873,33 @@ func (sa *Application) resetAppEvents() {
        defer sa.Unlock()
        sa.appEvents = schedEvt.NewApplicationEvents(events.GetEventSystem())
 }
+
+func TestGetUint64Tag(t *testing.T) {
+       app := &Application{
+               tags: map[string]string{
+                       "validUintTag":    "12345",
+                       "negativeUintTag": "-12345",
+                       "invalidUintTag":  "not-a-number",
+                       "emptyUintTag":    "",
+               },
+       }
+
+       tests := []struct {
+               name     string
+               tag      string
+               expected uint64
+       }{
+               {"Valid uint64 tag", "validUintTag", uint64(12345)},
+               {"Negative uint64 tag", "negativeUintTag", uint64(0)},
+               {"Invalid uint64 tag", "invalidUintTag", uint64(0)},
+               {"Empty tag", "emptyUintTag", uint64(0)},
+               {"Non-existent tag", "nonExistentTag", uint64(0)},
+       }
+
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       result := app.getUint64Tag(tt.tag)
+                       assert.Equal(t, tt.expected, result)
+               })
+       }
+}
diff --git a/pkg/scheduler/objects/queue.go b/pkg/scheduler/objects/queue.go
index 71b12e80..cb11c903 100644
--- a/pkg/scheduler/objects/queue.go
+++ b/pkg/scheduler/objects/queue.go
@@ -388,7 +388,6 @@ func (sq *Queue) setResourcesFromConf(resource 
configs.Resources) error {
                        zap.Error(err))
                return err
        }
-
        sq.setResources(guaranteedResource, maxResource)
        return nil
 }
@@ -453,6 +452,16 @@ func (sq *Queue) SetResources(guaranteedResource, 
maxResource *resources.Resourc
        sq.setResources(guaranteedResource, maxResource)
 }
 
+// SetMaxRunningApps allows setting the maximum running apps on a queue
+func (sq *Queue) SetMaxRunningApps(maxApps uint64) {
+       if sq == nil {
+               return
+       }
+       sq.Lock()
+       defer sq.Unlock()
+       sq.maxRunningApps = maxApps
+}
+
 // setTemplate sets the template on the queue based on the config.
 // lock free call, must be called holding the queue lock or during create only
 func (sq *Queue) setTemplate(conf configs.ChildTemplate) error {
@@ -601,7 +610,14 @@ func (sq *Queue) GetGuaranteedResource() 
*resources.Resource {
        return sq.guaranteedResource
 }
 
-// GetActualGuaranteedResource returns the actual (including parent) 
guaranteed resources for the queue.
+// GetMaxApps returns the maximum number of applications that can run in this 
queue.
+func (sq *Queue) GetMaxApps() uint64 {
+       sq.RLock()
+       defer sq.RUnlock()
+       return sq.maxRunningApps
+}
+
+// GetActualGuaranteedResources returns the actual (including parent) 
guaranteed resources for the queue.
 func (sq *Queue) GetActualGuaranteedResource() *resources.Resource {
        if sq == nil {
                return resources.NewResource()
@@ -1683,20 +1699,6 @@ func (sq *Queue) GetPreemptionPolicy() 
policies.PreemptionPolicy {
        return sq.preemptionPolicy
 }
 
-// SetMaxRunningApps allows setting the maximum running apps on a queue
-// test only
-func (sq *Queue) SetMaxRunningApps(max int) {
-       if sq == nil {
-               return
-       }
-       if max < 0 {
-               return
-       }
-       sq.Lock()
-       defer sq.Unlock()
-       sq.maxRunningApps = uint64(max)
-}
-
 // FindEligiblePreemptionVictims is used to locate tasks which may be 
preempted for the given ask.
 // queuePath is the fully-qualified path of the queue where ask resides
 // ask is the ask we are attempting to preempt for
diff --git a/pkg/scheduler/objects/queue_test.go 
b/pkg/scheduler/objects/queue_test.go
index cd0700ce..c0792d50 100644
--- a/pkg/scheduler/objects/queue_test.go
+++ b/pkg/scheduler/objects/queue_test.go
@@ -2611,6 +2611,22 @@ func TestQueue_allocatedResFits_Root(t *testing.T) {
        }
 }
 
+func TestQueueSetMaxRunningApps(t *testing.T) {
+       defer func() {
+               if r := recover(); r != nil {
+                       t.Fatal("panic on nil queue setMaxRunningApps")
+               }
+       }()
+       queue := &Queue{}
+       maxApps := uint64(10)
+
+       queue.SetMaxRunningApps(maxApps)
+       assert.Equal(t, maxApps, queue.maxRunningApps)
+
+       queue = nil
+       queue.SetMaxRunningApps(maxApps)
+}
+
 func TestQueue_allocatedResFits_Other(t *testing.T) {
        const first = "first"
        const second = "second"
diff --git a/pkg/scheduler/partition.go b/pkg/scheduler/partition.go
index a614ee4a..3a261822 100644
--- a/pkg/scheduler/partition.go
+++ b/pkg/scheduler/partition.go
@@ -342,13 +342,19 @@ func (pc *PartitionContext) AddApplication(app 
*objects.Application) error {
 
        guaranteedRes := app.GetGuaranteedResource()
        maxRes := app.GetMaxResource()
-       if !isRecoveryQueue && (guaranteedRes != nil || maxRes != nil) {
+       maxApps := app.GetMaxApps()
+       if !isRecoveryQueue && (guaranteedRes != nil || maxRes != nil || 
maxApps != 0) {
                // set resources based on tags, but only if the queue is 
dynamic (unmanaged)
                if queue.IsManaged() {
                        log.Log(log.SchedQueue).Warn("Trying to set resources 
on a queue that is not an unmanaged leaf",
                                zap.String("queueName", queue.QueuePath))
                } else {
-                       queue.SetResources(guaranteedRes, maxRes)
+                       if maxApps != 0 {
+                               queue.SetMaxRunningApps(maxApps)
+                       }
+                       if guaranteedRes != nil || maxRes != nil {
+                               queue.SetResources(guaranteedRes, maxRes)
+                       }
                }
        }
 
diff --git a/pkg/scheduler/partition_test.go b/pkg/scheduler/partition_test.go
index f138d239..24f51fad 100644
--- a/pkg/scheduler/partition_test.go
+++ b/pkg/scheduler/partition_test.go
@@ -2808,6 +2808,7 @@ func TestAddTGApplication(t *testing.T) {
        tags := map[string]string{
                siCommon.AppTagNamespaceResourceGuaranteed: 
"{\"resources\":{\"vcore\":{\"value\":111}}}",
                siCommon.AppTagNamespaceResourceQuota:      
"{\"resources\":{\"vcore\":{\"value\":2222}}}",
+               siCommon.AppTagNamespaceResourceMaxApps:    "1",
        }
        app := newApplicationTGTags(appID1, "default", "root.limited", tgRes, 
tags)
        err = partition.AddApplication(app)
@@ -2819,6 +2820,7 @@ func TestAddTGApplication(t *testing.T) {
                "vcore": 1000,
        })), "max resource changed unexpectedly")
        assert.Assert(t, queue.GetGuaranteedResource() == nil)
+       assert.Equal(t, queue.GetMaxApps(), uint64(2), "max running apps should 
be 2")
 
        // add a app with TG that does fit in the queue
        limit = map[string]string{"vcore": "100"}
@@ -2832,6 +2834,7 @@ func TestAddTGApplication(t *testing.T) {
                "vcore": 100000,
        })), "max resource changed unexpectedly")
        assert.Assert(t, queue.GetGuaranteedResource() == nil)
+       assert.Equal(t, queue.GetMaxApps(), uint64(2), "max running apps should 
be 2")
 
        // add a app with TG that does fit in the queue as the resource is not 
limited in the queue
        limit = map[string]string{"second": "100"}
@@ -2845,6 +2848,7 @@ func TestAddTGApplication(t *testing.T) {
                "second": 100,
        })), "max resource changed unexpectedly")
        assert.Assert(t, queue.GetGuaranteedResource() == nil)
+       assert.Equal(t, queue.GetMaxApps(), uint64(2), "max running apps should 
be 2")
 }
 
 func TestAddTGAppDynamic(t *testing.T) {
@@ -2861,7 +2865,11 @@ func TestAddTGAppDynamic(t *testing.T) {
        assert.Equal(t, app.GetQueuePath(), "root.unlimited", "app-1 not placed 
in expected queue")
 
        jsonMaxRes := "{\"resources\":{\"vcore\":{\"value\":10000}}}"
-       tags = map[string]string{"taskqueue": "same", 
siCommon.AppTagNamespaceResourceQuota: jsonMaxRes}
+       tags = map[string]string{
+               "taskqueue":                             "same",
+               siCommon.AppTagNamespaceResourceQuota:   jsonMaxRes,
+               siCommon.AppTagNamespaceResourceMaxApps: "1",
+       }
        app = newApplicationTGTags(appID2, "default", "unknown", tgRes, tags)
        err = partition.AddApplication(app)
        assert.NilError(t, err, "app-2 should have been added to the partition")
@@ -2874,10 +2882,16 @@ func TestAddTGAppDynamic(t *testing.T) {
        assert.Assert(t, resources.Equals(maxRes, 
resources.NewResourceFromMap(map[string]resources.Quantity{
                "vcore": 10000,
        })), "max resource set on the queue does not match the JSON tag")
+       assert.Equal(t, queue.GetMaxApps(), uint64(1), "max running apps should 
be 1")
 
        jsonMaxRes = "{\"resources\":{\"vcore\":{\"value\":1000}}}"
        jsonGuaranteedRes := "{\"resources\":{\"vcore\":{\"value\":111}}}"
-       tags = map[string]string{"taskqueue": "smaller", 
siCommon.AppTagNamespaceResourceQuota: jsonMaxRes, 
siCommon.AppTagNamespaceResourceGuaranteed: jsonGuaranteedRes}
+       tags = map[string]string{
+               "taskqueue":                                "smaller",
+               siCommon.AppTagNamespaceResourceQuota:      jsonMaxRes,
+               siCommon.AppTagNamespaceResourceGuaranteed: jsonGuaranteedRes,
+               siCommon.AppTagNamespaceResourceMaxApps:    "1",
+       }
        app = newApplicationTGTags(appID3, "default", "unknown", tgRes, tags)
        err = partition.AddApplication(app)
        if err == nil {
@@ -2900,6 +2914,7 @@ func TestAddTGAppDynamic(t *testing.T) {
        assert.Assert(t, resources.Equals(guaranteedRes, 
resources.NewResourceFromMap(map[string]resources.Quantity{
                "vcore": 111,
        })), "guaranteed resource set on the queue does not match the JSON tag")
+       assert.Equal(t, queue.GetMaxApps(), uint64(1), "max running apps should 
be 1")
 }
 
 func TestPlaceholderSmallerThanReal(t *testing.T) {
diff --git a/pkg/scheduler/utilities_test.go b/pkg/scheduler/utilities_test.go
index 96e6d8e5..ea1192cc 100644
--- a/pkg/scheduler/utilities_test.go
+++ b/pkg/scheduler/utilities_test.go
@@ -392,6 +392,7 @@ func newLimitedPartition(resLimit map[string]string) 
(*PartitionContext, error)
                                                Resources: configs.Resources{
                                                        Max: resLimit,
                                                },
+                                               MaxApplications: 2,
                                                Limits: []configs.Limit{
                                                        {
                                                                Limit: "limited 
queue limit",


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to