This is an automated email from the ASF dual-hosted git repository.

mani pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git


The following commit(s) were added to refs/heads/master by this push:
     new 7d220b51 [YUNIKORN-2169] Fix queue resource update through configmaps 
(#723)
7d220b51 is described below

commit 7d220b519e7476c0ae17a348b54029060858346e
Author: Manikandan R <[email protected]>
AuthorDate: Wed Nov 22 19:38:34 2023 +0530

    [YUNIKORN-2169] Fix queue resource update through configmaps (#723)
    
    Closes: #723
    
    Signed-off-by: Manikandan R <[email protected]>
---
 pkg/scheduler/objects/queue.go                   |  75 +++++++++++--
 pkg/scheduler/objects/queue_test.go              |  13 +--
 pkg/scheduler/partition_test.go                  | 128 ++++++++++++++++++-----
 pkg/scheduler/tests/application_tracking_test.go |  34 +++---
 pkg/scheduler/tests/mockscheduler_test.go        |   2 +
 5 files changed, 195 insertions(+), 57 deletions(-)

diff --git a/pkg/scheduler/objects/queue.go b/pkg/scheduler/objects/queue.go
index 56339cb8..e7eb293c 100644
--- a/pkg/scheduler/objects/queue.go
+++ b/pkg/scheduler/objects/queue.go
@@ -361,6 +361,7 @@ func (sq *Queue) setResources(resource configs.Resources) 
error {
        maxResource, err := resources.NewResourceFromConf(resource.Max)
        if err != nil {
                log.Log(log.SchedQueue).Error("parsing failed on max resources 
this should not happen",
+                       zap.String("queue", sq.QueuePath),
                        zap.Error(err))
                return err
        }
@@ -369,30 +370,62 @@ func (sq *Queue) setResources(resource configs.Resources) 
error {
        guaranteedResource, err = 
resources.NewResourceFromConf(resource.Guaranteed)
        if err != nil {
                log.Log(log.SchedQueue).Error("parsing failed on guaranteed 
resources this should not happen",
+                       zap.String("queue", sq.QueuePath),
                        zap.Error(err))
                return err
        }
 
-       if resources.StrictlyGreaterThanZero(maxResource) {
+       switch {
+       case resources.StrictlyGreaterThanZero(maxResource):
+               log.Log(log.SchedQueue).Debug("setting max resources",
+                       zap.String("queue", sq.QueuePath),
+                       zap.Stringer("current", sq.maxResource),
+                       zap.Stringer("new", maxResource))
                if !resources.Equals(sq.maxResource, maxResource) && 
sq.queueEvents != nil {
                        sq.queueEvents.sendMaxResourceChangedEvent()
                }
                sq.maxResource = maxResource
                sq.updateMaxResourceMetrics()
-       } else {
-               log.Log(log.SchedQueue).Debug("max resources setting ignored: 
cannot set zero max resources")
+       case sq.maxResource != nil:
+               log.Log(log.SchedQueue).Debug("setting max resources",
+                       zap.String("queue", sq.QueuePath),
+                       zap.Stringer("current", sq.maxResource),
+                       zap.Stringer("new", maxResource))
+               if sq.queueEvents != nil {
+                       sq.queueEvents.sendMaxResourceChangedEvent()
+               }
+               sq.maxResource = nil
+               sq.updateMaxResourceMetrics()
+       default:
+               log.Log(log.SchedQueue).Warn("max resources setting ignored: 
cannot set zero max resources",
+                       zap.String("queue", sq.QueuePath))
        }
 
-       if resources.StrictlyGreaterThanZero(guaranteedResource) {
+       switch {
+       case resources.StrictlyGreaterThanZero(guaranteedResource):
+               log.Log(log.SchedQueue).Debug("setting guaranteed resources",
+                       zap.String("queue", sq.QueuePath),
+                       zap.Stringer("current", sq.guaranteedResource),
+                       zap.Stringer("new", guaranteedResource))
                if !resources.Equals(sq.guaranteedResource, guaranteedResource) 
&& sq.queueEvents != nil {
                        sq.queueEvents.sendGuaranteedResourceChangedEvent()
                }
                sq.guaranteedResource = guaranteedResource
                sq.updateGuaranteedResourceMetrics()
-       } else {
-               log.Log(log.SchedQueue).Debug("guaranteed resources setting 
ignored: cannot set zero max resources")
+       case sq.guaranteedResource != nil:
+               log.Log(log.SchedQueue).Debug("setting guaranteed resources",
+                       zap.String("queue", sq.QueuePath),
+                       zap.Stringer("current", sq.guaranteedResource),
+                       zap.Stringer("new", guaranteedResource))
+               if sq.queueEvents != nil {
+                       sq.queueEvents.sendGuaranteedResourceChangedEvent()
+               }
+               sq.guaranteedResource = nil
+               sq.updateGuaranteedResourceMetrics()
+       default:
+               log.Log(log.SchedQueue).Warn("guaranteed resources setting 
ignored: cannot set zero guaranteed resources",
+                       zap.String("queue", sq.QueuePath))
        }
-
        return nil
 }
 
@@ -1266,8 +1299,32 @@ func (sq *Queue) SetMaxResource(max *resources.Resource) 
{
        log.Log(log.SchedQueue).Info("updating root queue max resources",
                zap.Stringer("current max", sq.maxResource),
                zap.Stringer("new max", max))
-       sq.maxResource = max.Clone()
-       sq.updateMaxResourceMetrics()
+
+       switch {
+       case resources.StrictlyGreaterThanZero(max):
+               log.Log(log.SchedQueue).Debug("setting max resources",
+                       zap.String("queue", sq.QueuePath),
+                       zap.Stringer("current", sq.maxResource),
+                       zap.Stringer("new", max))
+               if !resources.Equals(sq.maxResource, max) && sq.queueEvents != 
nil {
+                       sq.queueEvents.sendMaxResourceChangedEvent()
+               }
+               sq.maxResource = max.Clone()
+               sq.updateMaxResourceMetrics()
+       case sq.maxResource != nil:
+               log.Log(log.SchedQueue).Debug("setting max resources",
+                       zap.String("queue", sq.QueuePath),
+                       zap.Stringer("current", sq.maxResource),
+                       zap.Stringer("new", max))
+               if sq.queueEvents != nil {
+                       sq.queueEvents.sendMaxResourceChangedEvent()
+               }
+               sq.maxResource = nil
+               sq.updateMaxResourceMetrics()
+       default:
+               log.Log(log.SchedQueue).Warn("max resources setting ignored: 
cannot set zero max resources",
+                       zap.String("queue", sq.QueuePath))
+       }
 }
 
 // canRunApp returns if the queue could run a new app for this queue 
(recursively).
diff --git a/pkg/scheduler/objects/queue_test.go 
b/pkg/scheduler/objects/queue_test.go
index 2c6fe128..cb6613ef 100644
--- a/pkg/scheduler/objects/queue_test.go
+++ b/pkg/scheduler/objects/queue_test.go
@@ -1772,23 +1772,24 @@ func TestSetResources(t *testing.T) {
        assert.NilError(t, err, "failed to parse resource: %v", err)
        assert.DeepEqual(t, queue.maxResource, expectedMaxResource)
 
-       // case 1: empty resource won't change the resources
+       // case 1: empty resource would set the queue resources to 'nil' if it 
has been set already
+       var nilResource *resources.Resource = nil
        err = queue.setResources(configs.Resources{
                Guaranteed: make(map[string]string),
                Max:        make(map[string]string),
        })
        assert.NilError(t, err, "failed to set resources: %v", err)
-       assert.DeepEqual(t, queue.guaranteedResource, 
expectedGuaranteedResource)
-       assert.DeepEqual(t, queue.maxResource, expectedMaxResource)
+       assert.DeepEqual(t, queue.guaranteedResource, nilResource)
+       assert.DeepEqual(t, queue.maxResource, nilResource)
 
-       // case 2: zero resource won't change the resources
+       // case 2: zero resource won't change the queue resources as it is 
'nil' already
        err = queue.setResources(configs.Resources{
                Guaranteed: getZeroResourceConf(),
                Max:        getZeroResourceConf(),
        })
        assert.NilError(t, err, "failed to set resources: %v", err)
-       assert.DeepEqual(t, queue.guaranteedResource, 
expectedGuaranteedResource)
-       assert.DeepEqual(t, queue.maxResource, expectedMaxResource)
+       assert.DeepEqual(t, queue.guaranteedResource, nilResource)
+       assert.DeepEqual(t, queue.maxResource, nilResource)
 }
 
 func TestPreemptingResource(t *testing.T) {
diff --git a/pkg/scheduler/partition_test.go b/pkg/scheduler/partition_test.go
index e4780af8..3e8e09bb 100644
--- a/pkg/scheduler/partition_test.go
+++ b/pkg/scheduler/partition_test.go
@@ -1348,40 +1348,31 @@ func TestCreateDeepQueueConfig(t *testing.T) {
        assert.Equal(t, "root.level1.level2.level3.level4.level5", 
queue.GetQueuePath(), "root.level1.level2.level3.level4.level5 queue not found 
in partition")
 }
 
-func TestUpdateQueues(t *testing.T) {
-       conf := []configs.QueueConfig{
-               {
-                       Name:   "parent",
-                       Parent: false,
-                       Queues: nil,
-               },
+func assertUpdateQueues(t *testing.T, resourceType string, resMap 
map[string]string) {
+       var resExpect *resources.Resource
+       var err error
+       if len(resMap) > 0 {
+               resExpect, err = resources.NewResourceFromConf(resMap)
+               assert.NilError(t, err, "resource from conf failed")
+       } else {
+               resExpect = nil
        }
 
-       partition, err := newBasePartition()
-       assert.NilError(t, err, "partition create failed")
-       // There is a queue setup as the config must be valid when we run
-       root := partition.GetQueue("root")
-       if root == nil {
-               t.Error("root queue not found in partition")
-       }
-       err = partition.updateQueues(conf, root)
-       assert.NilError(t, err, "queue update from config failed")
-       def := partition.GetQueue(defQueue)
-       if def == nil {
-               t.Fatal("default queue should still exist")
+       var res configs.Resources
+       switch resourceType {
+       case "max":
+               res = configs.Resources{Max: resMap}
+       case "guaranteed":
+               res = configs.Resources{Guaranteed: resMap}
+       default:
+               res = configs.Resources{Max: resMap, Guaranteed: resMap}
        }
-       assert.Assert(t, def.IsDraining(), "'root.default' queue should have 
been marked for removal")
 
-       var resExpect *resources.Resource
-       resMap := map[string]string{"vcore": "1"}
-       resExpect, err = resources.NewResourceFromConf(resMap)
-       assert.NilError(t, err, "resource from conf failed")
-
-       conf = []configs.QueueConfig{
+       conf := []configs.QueueConfig{
                {
                        Name:      "parent",
                        Parent:    true,
-                       Resources: configs.Resources{Max: resMap},
+                       Resources: res,
                        Queues: []configs.QueueConfig{
                                {
                                        Name:   "leaf",
@@ -1391,19 +1382,75 @@ func TestUpdateQueues(t *testing.T) {
                        },
                },
        }
+
+       partition, err := newBasePartition()
+       assert.NilError(t, err, "partition create failed")
+
+       // There is a queue setup as the config must be valid when we run
+       root := partition.GetQueue("root")
+       if root == nil {
+               t.Error("root queue not found in partition")
+       }
+
        err = partition.updateQueues(conf, root)
        assert.NilError(t, err, "queue update from config failed")
        parent := partition.GetQueue("root.parent")
        if parent == nil {
                t.Fatal("parent queue should still exist")
        }
-       assert.Assert(t, resources.Equals(parent.GetMaxResource(), resExpect), 
"parent queue max resource should have been updated")
+       switch resourceType {
+       case "max":
+               assert.Assert(t, resources.Equals(parent.GetMaxResource(), 
resExpect), "parent queue max resource should have been updated")
+               assert.Assert(t, 
resources.Equals(parent.GetGuaranteedResource(), nil), "parent queue guaranteed 
resource should have been updated")
+       case "guaranteed":
+               assert.Assert(t, resources.Equals(parent.GetMaxResource(), 
nil), "parent queue max resource should have been updated")
+               assert.Assert(t, 
resources.Equals(parent.GetGuaranteedResource(), resExpect), "parent queue 
guaranteed resource should have been updated")
+       default:
+               assert.Assert(t, resources.Equals(parent.GetMaxResource(), 
resExpect), "parent queue max resource should have been updated")
+               assert.Assert(t, 
resources.Equals(parent.GetGuaranteedResource(), resExpect), "parent queue 
guaranteed resource should have been updated")
+       }
        leaf := partition.GetQueue("root.parent.leaf")
        if leaf == nil {
                t.Fatal("leaf queue should have been created")
        }
 }
 
+func TestUpdateQueues(t *testing.T) {
+       conf := []configs.QueueConfig{
+               {
+                       Name:   "parent",
+                       Parent: false,
+                       Queues: nil,
+               },
+       }
+
+       partition, err := newBasePartition()
+       assert.NilError(t, err, "partition create failed")
+       // There is a queue setup as the config must be valid when we run
+       root := partition.GetQueue("root")
+       if root == nil {
+               t.Error("root queue not found in partition")
+       }
+       err = partition.updateQueues(conf, root)
+       assert.NilError(t, err, "queue update from config failed")
+       def := partition.GetQueue(defQueue)
+       if def == nil {
+               t.Fatal("default queue should still exist")
+       }
+       assert.Assert(t, def.IsDraining(), "'root.default' queue should have 
been marked for removal")
+
+       assertUpdateQueues(t, "max", map[string]string{"vcore": "2"})
+       assertUpdateQueues(t, "max", map[string]string{"vcore": "5"})
+       assertUpdateQueues(t, "max", map[string]string{"memory": "5"})
+       assertUpdateQueues(t, "guaranteed", map[string]string{"vcore": "2", 
"memory": "5"})
+       assertUpdateQueues(t, "guaranteed", map[string]string{"vcore": "4", 
"memory": "3"})
+       assertUpdateQueues(t, "guaranteed", map[string]string{"vcore": "10"})
+       assertUpdateQueues(t, "both", map[string]string{"vcore": "2", "memory": 
"5"})
+       assertUpdateQueues(t, "both", map[string]string{"vcore": "5", "memory": 
"2"})
+       assertUpdateQueues(t, "both", map[string]string{"vcore": "5"})
+       assertUpdateQueues(t, "both", map[string]string{})
+}
+
 func TestGetQueue(t *testing.T) {
        // get the partition
        partition, err := newBasePartition()
@@ -2512,6 +2559,31 @@ func TestUpdateRootQueue(t *testing.T) {
        // make sure the update went through
        assert.Equal(t, partition.GetQueue("root.leaf").CurrentState(), 
objects.Draining.String(), "leaf queue should have been marked for removal")
        assert.Equal(t, partition.GetQueue("root.parent").CurrentState(), 
objects.Draining.String(), "parent queue should have been marked for removal")
+
+       // add new node, node 3 with 'memory' resource type
+       res1, err1 := resources.NewResourceFromConf(map[string]string{"vcore": 
"20", "memory": "50"})
+       assert.NilError(t, err1, "resource creation failed")
+       err = partition.AddNode(newNodeMaxResource("node-3", res1), nil)
+       assert.NilError(t, err, "test node3 add failed unexpected")
+
+       // root max resource gets updated with 'memory' resource type
+       expRes, err1 := 
resources.NewResourceFromConf(map[string]string{"vcore": "40", "memory": "50"})
+       assert.NilError(t, err1, "resource creation failed")
+       assert.Assert(t, resources.Equals(expRes, 
partition.root.GetMaxResource()), "root max resource not set as expected")
+
+       // remove node, node 3. root max resource won't have 'memory' resource 
type and updated with less 'vcore'
+       partition.removeNode("node-3")
+       assert.Assert(t, resources.Equals(res, 
partition.root.GetMaxResource()), "root max resource not set as expected")
+
+       // remove node, node 2. root max resource gets updated with less 
'vcores'
+       partition.removeNode("node-2")
+       expRes1, err1 := 
resources.NewResourceFromConf(map[string]string{"vcore": "10"})
+       assert.NilError(t, err1, "resource creation failed")
+       assert.Assert(t, resources.Equals(expRes1, 
partition.root.GetMaxResource()), "root max resource not set as expected")
+
+       // remove node, node 1. root max resource should set to nil
+       partition.removeNode("node-1")
+       assert.Assert(t, resources.Equals(nil, 
partition.root.GetMaxResource()), "root max resource not set as expected")
 }
 
 // transition an application to completed state and wait for it to be 
processed into the completedApplications map
diff --git a/pkg/scheduler/tests/application_tracking_test.go 
b/pkg/scheduler/tests/application_tracking_test.go
index 929264b4..36a3e5c2 100644
--- a/pkg/scheduler/tests/application_tracking_test.go
+++ b/pkg/scheduler/tests/application_tracking_test.go
@@ -73,8 +73,8 @@ func TestApplicationHistoryTracking(t *testing.T) {
        ms.mockRM.waitForAcceptedNode(t, "node-1:1234", 1000)
        events, err = client.GetEvents()
        assert.NilError(t, err)
-       assert.Equal(t, 4, len(events.EventRecords), "number of events 
generated")
-       verifyNodeAddedEvents(t, events.EventRecords[2:])
+       assert.Equal(t, 5, len(events.EventRecords), "number of events 
generated")
+       verifyNodeAddedAndQueueMaxSetEvents(t, events.EventRecords[2:])
 
        // Add application & check events
        err = ms.proxy.UpdateApplication(&si.ApplicationRequest{
@@ -85,8 +85,8 @@ func TestApplicationHistoryTracking(t *testing.T) {
        ms.mockRM.waitForAcceptedApplication(t, appID1, 1000)
        events, err = client.GetEvents()
        assert.NilError(t, err)
-       assert.Equal(t, 6, len(events.EventRecords), "number of events 
generated")
-       verifyAppAddedEvents(t, events.EventRecords[4:])
+       assert.Equal(t, 7, len(events.EventRecords), "number of events 
generated")
+       verifyAppAddedEvents(t, events.EventRecords[5:])
 
        // Add allocation ask & check events
        err = ms.proxy.UpdateAllocation(&si.AllocationRequest{
@@ -109,8 +109,8 @@ func TestApplicationHistoryTracking(t *testing.T) {
        ms.mockRM.waitForAllocations(t, 1, 1000)
        events, err = client.GetEvents()
        assert.NilError(t, err)
-       assert.Equal(t, 11, len(events.EventRecords), "number of events 
generated")
-       verifyAllocationAskAddedEvents(t, events.EventRecords[6:])
+       assert.Equal(t, 12, len(events.EventRecords), "number of events 
generated")
+       verifyAllocationAskAddedEvents(t, events.EventRecords[7:])
 
        allocations := ms.mockRM.getAllocations()
        assert.Equal(t, 1, len(allocations), "number of allocations")
@@ -144,8 +144,8 @@ func TestApplicationHistoryTracking(t *testing.T) {
 
        events, err = client.GetEvents()
        assert.NilError(t, err)
-       assert.Equal(t, 14, len(events.EventRecords), "number of events 
generated")
-       verifyAllocationCancelledEvents(t, events.EventRecords[11:])
+       assert.Equal(t, 15, len(events.EventRecords), "number of events 
generated")
+       verifyAllocationCancelledEvents(t, events.EventRecords[12:])
 }
 
 func verifyQueueEvents(t *testing.T, events []*si.EventRecord) {
@@ -162,7 +162,7 @@ func verifyQueueEvents(t *testing.T, events 
[]*si.EventRecord) {
        assert.Equal(t, si.EventRecord_DETAILS_NONE, 
events[1].EventChangeDetail)
 }
 
-func verifyNodeAddedEvents(t *testing.T, events []*si.EventRecord) {
+func verifyNodeAddedAndQueueMaxSetEvents(t *testing.T, events 
[]*si.EventRecord) {
        assert.Equal(t, "node-1:1234", events[0].ObjectID)
        assert.Equal(t, "schedulable: true", events[0].Message)
        assert.Equal(t, "", events[0].ReferenceID)
@@ -170,12 +170,18 @@ func verifyNodeAddedEvents(t *testing.T, events 
[]*si.EventRecord) {
        assert.Equal(t, si.EventRecord_SET, events[0].EventChangeType)
        assert.Equal(t, si.EventRecord_NODE_SCHEDULABLE, 
events[0].EventChangeDetail)
 
-       assert.Equal(t, "node-1:1234", events[1].ObjectID)
-       assert.Equal(t, "Node added to the scheduler", events[1].Message)
+       assert.Equal(t, "root", events[1].ObjectID)
+       assert.Equal(t, "", events[1].Message)
        assert.Equal(t, "", events[1].ReferenceID)
-       assert.Equal(t, si.EventRecord_NODE, events[1].Type)
-       assert.Equal(t, si.EventRecord_ADD, events[1].EventChangeType)
-       assert.Equal(t, si.EventRecord_DETAILS_NONE, 
events[1].EventChangeDetail)
+       assert.Equal(t, si.EventRecord_SET, events[1].EventChangeType)
+       assert.Equal(t, si.EventRecord_QUEUE_MAX, events[1].EventChangeDetail)
+
+       assert.Equal(t, "node-1:1234", events[2].ObjectID)
+       assert.Equal(t, "Node added to the scheduler", events[2].Message)
+       assert.Equal(t, "", events[2].ReferenceID)
+       assert.Equal(t, si.EventRecord_NODE, events[2].Type)
+       assert.Equal(t, si.EventRecord_ADD, events[2].EventChangeType)
+       assert.Equal(t, si.EventRecord_DETAILS_NONE, 
events[2].EventChangeDetail)
 }
 
 func verifyAppAddedEvents(t *testing.T, events []*si.EventRecord) {
diff --git a/pkg/scheduler/tests/mockscheduler_test.go 
b/pkg/scheduler/tests/mockscheduler_test.go
index e5b7bd0e..4c81f2b9 100644
--- a/pkg/scheduler/tests/mockscheduler_test.go
+++ b/pkg/scheduler/tests/mockscheduler_test.go
@@ -25,6 +25,7 @@ import (
 
        "github.com/apache/yunikorn-core/pkg/common"
        "github.com/apache/yunikorn-core/pkg/entrypoint"
+       "github.com/apache/yunikorn-core/pkg/events"
        "github.com/apache/yunikorn-core/pkg/scheduler"
        "github.com/apache/yunikorn-core/pkg/scheduler/objects"
        "github.com/apache/yunikorn-scheduler-interface/lib/go/api"
@@ -52,6 +53,7 @@ func (m *mockScheduler) Init(config string, autoSchedule 
bool, withWebapp bool)
        BuildInfoMap := make(map[string]string)
        BuildInfoMap["k"] = "v"
 
+       events.Init()
        m.serviceContext = entrypoint.StartAllServicesWithParams(!autoSchedule, 
withWebapp)
 
        m.proxy = m.serviceContext.RMProxy


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to