This is an automated email from the ASF dual-hosted git repository.
mani pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git
The following commit(s) were added to refs/heads/master by this push:
new 7d220b51 [YUNIKORN-2169] Fix queue resource update through configmaps
(#723)
7d220b51 is described below
commit 7d220b519e7476c0ae17a348b54029060858346e
Author: Manikandan R <[email protected]>
AuthorDate: Wed Nov 22 19:38:34 2023 +0530
[YUNIKORN-2169] Fix queue resource update through configmaps (#723)
Closes: #723
Signed-off-by: Manikandan R <[email protected]>
---
pkg/scheduler/objects/queue.go | 75 +++++++++++--
pkg/scheduler/objects/queue_test.go | 13 +--
pkg/scheduler/partition_test.go | 128 ++++++++++++++++++-----
pkg/scheduler/tests/application_tracking_test.go | 34 +++---
pkg/scheduler/tests/mockscheduler_test.go | 2 +
5 files changed, 195 insertions(+), 57 deletions(-)
diff --git a/pkg/scheduler/objects/queue.go b/pkg/scheduler/objects/queue.go
index 56339cb8..e7eb293c 100644
--- a/pkg/scheduler/objects/queue.go
+++ b/pkg/scheduler/objects/queue.go
@@ -361,6 +361,7 @@ func (sq *Queue) setResources(resource configs.Resources)
error {
maxResource, err := resources.NewResourceFromConf(resource.Max)
if err != nil {
log.Log(log.SchedQueue).Error("parsing failed on max resources
this should not happen",
+ zap.String("queue", sq.QueuePath),
zap.Error(err))
return err
}
@@ -369,30 +370,62 @@ func (sq *Queue) setResources(resource configs.Resources)
error {
guaranteedResource, err =
resources.NewResourceFromConf(resource.Guaranteed)
if err != nil {
log.Log(log.SchedQueue).Error("parsing failed on guaranteed
resources this should not happen",
+ zap.String("queue", sq.QueuePath),
zap.Error(err))
return err
}
- if resources.StrictlyGreaterThanZero(maxResource) {
+ switch {
+ case resources.StrictlyGreaterThanZero(maxResource):
+ log.Log(log.SchedQueue).Debug("setting max resources",
+ zap.String("queue", sq.QueuePath),
+ zap.Stringer("current", sq.maxResource),
+ zap.Stringer("new", maxResource))
if !resources.Equals(sq.maxResource, maxResource) &&
sq.queueEvents != nil {
sq.queueEvents.sendMaxResourceChangedEvent()
}
sq.maxResource = maxResource
sq.updateMaxResourceMetrics()
- } else {
- log.Log(log.SchedQueue).Debug("max resources setting ignored:
cannot set zero max resources")
+ case sq.maxResource != nil:
+ log.Log(log.SchedQueue).Debug("setting max resources",
+ zap.String("queue", sq.QueuePath),
+ zap.Stringer("current", sq.maxResource),
+ zap.Stringer("new", maxResource))
+ if sq.queueEvents != nil {
+ sq.queueEvents.sendMaxResourceChangedEvent()
+ }
+ sq.maxResource = nil
+ sq.updateMaxResourceMetrics()
+ default:
+ log.Log(log.SchedQueue).Warn("max resources setting ignored:
cannot set zero max resources",
+ zap.String("queue", sq.QueuePath))
}
- if resources.StrictlyGreaterThanZero(guaranteedResource) {
+ switch {
+ case resources.StrictlyGreaterThanZero(guaranteedResource):
+ log.Log(log.SchedQueue).Debug("setting guaranteed resources",
+ zap.String("queue", sq.QueuePath),
+ zap.Stringer("current", sq.guaranteedResource),
+ zap.Stringer("new", guaranteedResource))
if !resources.Equals(sq.guaranteedResource, guaranteedResource)
&& sq.queueEvents != nil {
sq.queueEvents.sendGuaranteedResourceChangedEvent()
}
sq.guaranteedResource = guaranteedResource
sq.updateGuaranteedResourceMetrics()
- } else {
- log.Log(log.SchedQueue).Debug("guaranteed resources setting
ignored: cannot set zero max resources")
+ case sq.guaranteedResource != nil:
+ log.Log(log.SchedQueue).Debug("setting guaranteed resources",
+ zap.String("queue", sq.QueuePath),
+ zap.Stringer("current", sq.guaranteedResource),
+ zap.Stringer("new", guaranteedResource))
+ if sq.queueEvents != nil {
+ sq.queueEvents.sendGuaranteedResourceChangedEvent()
+ }
+ sq.guaranteedResource = nil
+ sq.updateGuaranteedResourceMetrics()
+ default:
+ log.Log(log.SchedQueue).Warn("guaranteed resources setting
ignored: cannot set zero guaranteed resources",
+ zap.String("queue", sq.QueuePath))
}
-
return nil
}
@@ -1266,8 +1299,32 @@ func (sq *Queue) SetMaxResource(max *resources.Resource)
{
log.Log(log.SchedQueue).Info("updating root queue max resources",
zap.Stringer("current max", sq.maxResource),
zap.Stringer("new max", max))
- sq.maxResource = max.Clone()
- sq.updateMaxResourceMetrics()
+
+ switch {
+ case resources.StrictlyGreaterThanZero(max):
+ log.Log(log.SchedQueue).Debug("setting max resources",
+ zap.String("queue", sq.QueuePath),
+ zap.Stringer("current", sq.maxResource),
+ zap.Stringer("new", max))
+ if !resources.Equals(sq.maxResource, max) && sq.queueEvents !=
nil {
+ sq.queueEvents.sendMaxResourceChangedEvent()
+ }
+ sq.maxResource = max.Clone()
+ sq.updateMaxResourceMetrics()
+ case sq.maxResource != nil:
+ log.Log(log.SchedQueue).Debug("setting max resources",
+ zap.String("queue", sq.QueuePath),
+ zap.Stringer("current", sq.maxResource),
+ zap.Stringer("new", max))
+ if sq.queueEvents != nil {
+ sq.queueEvents.sendMaxResourceChangedEvent()
+ }
+ sq.maxResource = nil
+ sq.updateMaxResourceMetrics()
+ default:
+ log.Log(log.SchedQueue).Warn("max resources setting ignored:
cannot set zero max resources",
+ zap.String("queue", sq.QueuePath))
+ }
}
// canRunApp returns if the queue could run a new app for this queue
(recursively).
diff --git a/pkg/scheduler/objects/queue_test.go
b/pkg/scheduler/objects/queue_test.go
index 2c6fe128..cb6613ef 100644
--- a/pkg/scheduler/objects/queue_test.go
+++ b/pkg/scheduler/objects/queue_test.go
@@ -1772,23 +1772,24 @@ func TestSetResources(t *testing.T) {
assert.NilError(t, err, "failed to parse resource: %v", err)
assert.DeepEqual(t, queue.maxResource, expectedMaxResource)
- // case 1: empty resource won't change the resources
+ // case 1: empty resource would set the queue resources to 'nil' if it
has been set already
+ var nilResource *resources.Resource = nil
err = queue.setResources(configs.Resources{
Guaranteed: make(map[string]string),
Max: make(map[string]string),
})
assert.NilError(t, err, "failed to set resources: %v", err)
- assert.DeepEqual(t, queue.guaranteedResource,
expectedGuaranteedResource)
- assert.DeepEqual(t, queue.maxResource, expectedMaxResource)
+ assert.DeepEqual(t, queue.guaranteedResource, nilResource)
+ assert.DeepEqual(t, queue.maxResource, nilResource)
- // case 2: zero resource won't change the resources
+ // case 2: zero resource won't change the queue resources as it is
'nil' already
err = queue.setResources(configs.Resources{
Guaranteed: getZeroResourceConf(),
Max: getZeroResourceConf(),
})
assert.NilError(t, err, "failed to set resources: %v", err)
- assert.DeepEqual(t, queue.guaranteedResource,
expectedGuaranteedResource)
- assert.DeepEqual(t, queue.maxResource, expectedMaxResource)
+ assert.DeepEqual(t, queue.guaranteedResource, nilResource)
+ assert.DeepEqual(t, queue.maxResource, nilResource)
}
func TestPreemptingResource(t *testing.T) {
diff --git a/pkg/scheduler/partition_test.go b/pkg/scheduler/partition_test.go
index e4780af8..3e8e09bb 100644
--- a/pkg/scheduler/partition_test.go
+++ b/pkg/scheduler/partition_test.go
@@ -1348,40 +1348,31 @@ func TestCreateDeepQueueConfig(t *testing.T) {
assert.Equal(t, "root.level1.level2.level3.level4.level5",
queue.GetQueuePath(), "root.level1.level2.level3.level4.level5 queue not found
in partition")
}
-func TestUpdateQueues(t *testing.T) {
- conf := []configs.QueueConfig{
- {
- Name: "parent",
- Parent: false,
- Queues: nil,
- },
+func assertUpdateQueues(t *testing.T, resourceType string, resMap
map[string]string) {
+ var resExpect *resources.Resource
+ var err error
+ if len(resMap) > 0 {
+ resExpect, err = resources.NewResourceFromConf(resMap)
+ assert.NilError(t, err, "resource from conf failed")
+ } else {
+ resExpect = nil
}
- partition, err := newBasePartition()
- assert.NilError(t, err, "partition create failed")
- // There is a queue setup as the config must be valid when we run
- root := partition.GetQueue("root")
- if root == nil {
- t.Error("root queue not found in partition")
- }
- err = partition.updateQueues(conf, root)
- assert.NilError(t, err, "queue update from config failed")
- def := partition.GetQueue(defQueue)
- if def == nil {
- t.Fatal("default queue should still exist")
+ var res configs.Resources
+ switch resourceType {
+ case "max":
+ res = configs.Resources{Max: resMap}
+ case "guaranteed":
+ res = configs.Resources{Guaranteed: resMap}
+ default:
+ res = configs.Resources{Max: resMap, Guaranteed: resMap}
}
- assert.Assert(t, def.IsDraining(), "'root.default' queue should have
been marked for removal")
- var resExpect *resources.Resource
- resMap := map[string]string{"vcore": "1"}
- resExpect, err = resources.NewResourceFromConf(resMap)
- assert.NilError(t, err, "resource from conf failed")
-
- conf = []configs.QueueConfig{
+ conf := []configs.QueueConfig{
{
Name: "parent",
Parent: true,
- Resources: configs.Resources{Max: resMap},
+ Resources: res,
Queues: []configs.QueueConfig{
{
Name: "leaf",
@@ -1391,19 +1382,75 @@ func TestUpdateQueues(t *testing.T) {
},
},
}
+
+ partition, err := newBasePartition()
+ assert.NilError(t, err, "partition create failed")
+
+ // There is a queue setup as the config must be valid when we run
+ root := partition.GetQueue("root")
+ if root == nil {
+ t.Error("root queue not found in partition")
+ }
+
err = partition.updateQueues(conf, root)
assert.NilError(t, err, "queue update from config failed")
parent := partition.GetQueue("root.parent")
if parent == nil {
t.Fatal("parent queue should still exist")
}
- assert.Assert(t, resources.Equals(parent.GetMaxResource(), resExpect),
"parent queue max resource should have been updated")
+ switch resourceType {
+ case "max":
+ assert.Assert(t, resources.Equals(parent.GetMaxResource(),
resExpect), "parent queue max resource should have been updated")
+ assert.Assert(t,
resources.Equals(parent.GetGuaranteedResource(), nil), "parent queue guaranteed
resource should have been updated")
+ case "guaranteed":
+ assert.Assert(t, resources.Equals(parent.GetMaxResource(),
nil), "parent queue max resource should have been updated")
+ assert.Assert(t,
resources.Equals(parent.GetGuaranteedResource(), resExpect), "parent queue
guaranteed resource should have been updated")
+ default:
+ assert.Assert(t, resources.Equals(parent.GetMaxResource(),
resExpect), "parent queue max resource should have been updated")
+ assert.Assert(t,
resources.Equals(parent.GetGuaranteedResource(), resExpect), "parent queue
guaranteed resource should have been updated")
+ }
leaf := partition.GetQueue("root.parent.leaf")
if leaf == nil {
t.Fatal("leaf queue should have been created")
}
}
+func TestUpdateQueues(t *testing.T) {
+ conf := []configs.QueueConfig{
+ {
+ Name: "parent",
+ Parent: false,
+ Queues: nil,
+ },
+ }
+
+ partition, err := newBasePartition()
+ assert.NilError(t, err, "partition create failed")
+ // There is a queue setup as the config must be valid when we run
+ root := partition.GetQueue("root")
+ if root == nil {
+ t.Error("root queue not found in partition")
+ }
+ err = partition.updateQueues(conf, root)
+ assert.NilError(t, err, "queue update from config failed")
+ def := partition.GetQueue(defQueue)
+ if def == nil {
+ t.Fatal("default queue should still exist")
+ }
+ assert.Assert(t, def.IsDraining(), "'root.default' queue should have
been marked for removal")
+
+ assertUpdateQueues(t, "max", map[string]string{"vcore": "2"})
+ assertUpdateQueues(t, "max", map[string]string{"vcore": "5"})
+ assertUpdateQueues(t, "max", map[string]string{"memory": "5"})
+ assertUpdateQueues(t, "guaranteed", map[string]string{"vcore": "2",
"memory": "5"})
+ assertUpdateQueues(t, "guaranteed", map[string]string{"vcore": "4",
"memory": "3"})
+ assertUpdateQueues(t, "guaranteed", map[string]string{"vcore": "10"})
+ assertUpdateQueues(t, "both", map[string]string{"vcore": "2", "memory":
"5"})
+ assertUpdateQueues(t, "both", map[string]string{"vcore": "5", "memory":
"2"})
+ assertUpdateQueues(t, "both", map[string]string{"vcore": "5"})
+ assertUpdateQueues(t, "both", map[string]string{})
+}
+
func TestGetQueue(t *testing.T) {
// get the partition
partition, err := newBasePartition()
@@ -2512,6 +2559,31 @@ func TestUpdateRootQueue(t *testing.T) {
// make sure the update went through
assert.Equal(t, partition.GetQueue("root.leaf").CurrentState(),
objects.Draining.String(), "leaf queue should have been marked for removal")
assert.Equal(t, partition.GetQueue("root.parent").CurrentState(),
objects.Draining.String(), "parent queue should have been marked for removal")
+
+ // add new node, node 3 with 'memory' resource type
+ res1, err1 := resources.NewResourceFromConf(map[string]string{"vcore":
"20", "memory": "50"})
+ assert.NilError(t, err1, "resource creation failed")
+ err = partition.AddNode(newNodeMaxResource("node-3", res1), nil)
+ assert.NilError(t, err, "test node3 add failed unexpected")
+
+ // root max resource gets updated with 'memory' resource type
+ expRes, err1 :=
resources.NewResourceFromConf(map[string]string{"vcore": "40", "memory": "50"})
+ assert.NilError(t, err1, "resource creation failed")
+ assert.Assert(t, resources.Equals(expRes,
partition.root.GetMaxResource()), "root max resource not set as expected")
+
+ // remove node, node 3. root max resource won't have 'memory' resource
type and updated with less 'vcore'
+ partition.removeNode("node-3")
+ assert.Assert(t, resources.Equals(res,
partition.root.GetMaxResource()), "root max resource not set as expected")
+
+ // remove node, node 2. root max resource gets updated with less
'vcores'
+ partition.removeNode("node-2")
+ expRes1, err1 :=
resources.NewResourceFromConf(map[string]string{"vcore": "10"})
+ assert.NilError(t, err1, "resource creation failed")
+ assert.Assert(t, resources.Equals(expRes1,
partition.root.GetMaxResource()), "root max resource not set as expected")
+
+ // remove node, node 1. root max resource should set to nil
+ partition.removeNode("node-1")
+ assert.Assert(t, resources.Equals(nil,
partition.root.GetMaxResource()), "root max resource not set as expected")
}
// transition an application to completed state and wait for it to be
processed into the completedApplications map
diff --git a/pkg/scheduler/tests/application_tracking_test.go
b/pkg/scheduler/tests/application_tracking_test.go
index 929264b4..36a3e5c2 100644
--- a/pkg/scheduler/tests/application_tracking_test.go
+++ b/pkg/scheduler/tests/application_tracking_test.go
@@ -73,8 +73,8 @@ func TestApplicationHistoryTracking(t *testing.T) {
ms.mockRM.waitForAcceptedNode(t, "node-1:1234", 1000)
events, err = client.GetEvents()
assert.NilError(t, err)
- assert.Equal(t, 4, len(events.EventRecords), "number of events
generated")
- verifyNodeAddedEvents(t, events.EventRecords[2:])
+ assert.Equal(t, 5, len(events.EventRecords), "number of events
generated")
+ verifyNodeAddedAndQueueMaxSetEvents(t, events.EventRecords[2:])
// Add application & check events
err = ms.proxy.UpdateApplication(&si.ApplicationRequest{
@@ -85,8 +85,8 @@ func TestApplicationHistoryTracking(t *testing.T) {
ms.mockRM.waitForAcceptedApplication(t, appID1, 1000)
events, err = client.GetEvents()
assert.NilError(t, err)
- assert.Equal(t, 6, len(events.EventRecords), "number of events
generated")
- verifyAppAddedEvents(t, events.EventRecords[4:])
+ assert.Equal(t, 7, len(events.EventRecords), "number of events
generated")
+ verifyAppAddedEvents(t, events.EventRecords[5:])
// Add allocation ask & check events
err = ms.proxy.UpdateAllocation(&si.AllocationRequest{
@@ -109,8 +109,8 @@ func TestApplicationHistoryTracking(t *testing.T) {
ms.mockRM.waitForAllocations(t, 1, 1000)
events, err = client.GetEvents()
assert.NilError(t, err)
- assert.Equal(t, 11, len(events.EventRecords), "number of events
generated")
- verifyAllocationAskAddedEvents(t, events.EventRecords[6:])
+ assert.Equal(t, 12, len(events.EventRecords), "number of events
generated")
+ verifyAllocationAskAddedEvents(t, events.EventRecords[7:])
allocations := ms.mockRM.getAllocations()
assert.Equal(t, 1, len(allocations), "number of allocations")
@@ -144,8 +144,8 @@ func TestApplicationHistoryTracking(t *testing.T) {
events, err = client.GetEvents()
assert.NilError(t, err)
- assert.Equal(t, 14, len(events.EventRecords), "number of events
generated")
- verifyAllocationCancelledEvents(t, events.EventRecords[11:])
+ assert.Equal(t, 15, len(events.EventRecords), "number of events
generated")
+ verifyAllocationCancelledEvents(t, events.EventRecords[12:])
}
func verifyQueueEvents(t *testing.T, events []*si.EventRecord) {
@@ -162,7 +162,7 @@ func verifyQueueEvents(t *testing.T, events
[]*si.EventRecord) {
assert.Equal(t, si.EventRecord_DETAILS_NONE,
events[1].EventChangeDetail)
}
-func verifyNodeAddedEvents(t *testing.T, events []*si.EventRecord) {
+func verifyNodeAddedAndQueueMaxSetEvents(t *testing.T, events
[]*si.EventRecord) {
assert.Equal(t, "node-1:1234", events[0].ObjectID)
assert.Equal(t, "schedulable: true", events[0].Message)
assert.Equal(t, "", events[0].ReferenceID)
@@ -170,12 +170,18 @@ func verifyNodeAddedEvents(t *testing.T, events
[]*si.EventRecord) {
assert.Equal(t, si.EventRecord_SET, events[0].EventChangeType)
assert.Equal(t, si.EventRecord_NODE_SCHEDULABLE,
events[0].EventChangeDetail)
- assert.Equal(t, "node-1:1234", events[1].ObjectID)
- assert.Equal(t, "Node added to the scheduler", events[1].Message)
+ assert.Equal(t, "root", events[1].ObjectID)
+ assert.Equal(t, "", events[1].Message)
assert.Equal(t, "", events[1].ReferenceID)
- assert.Equal(t, si.EventRecord_NODE, events[1].Type)
- assert.Equal(t, si.EventRecord_ADD, events[1].EventChangeType)
- assert.Equal(t, si.EventRecord_DETAILS_NONE,
events[1].EventChangeDetail)
+ assert.Equal(t, si.EventRecord_SET, events[1].EventChangeType)
+ assert.Equal(t, si.EventRecord_QUEUE_MAX, events[1].EventChangeDetail)
+
+ assert.Equal(t, "node-1:1234", events[2].ObjectID)
+ assert.Equal(t, "Node added to the scheduler", events[2].Message)
+ assert.Equal(t, "", events[2].ReferenceID)
+ assert.Equal(t, si.EventRecord_NODE, events[2].Type)
+ assert.Equal(t, si.EventRecord_ADD, events[2].EventChangeType)
+ assert.Equal(t, si.EventRecord_DETAILS_NONE,
events[2].EventChangeDetail)
}
func verifyAppAddedEvents(t *testing.T, events []*si.EventRecord) {
diff --git a/pkg/scheduler/tests/mockscheduler_test.go
b/pkg/scheduler/tests/mockscheduler_test.go
index e5b7bd0e..4c81f2b9 100644
--- a/pkg/scheduler/tests/mockscheduler_test.go
+++ b/pkg/scheduler/tests/mockscheduler_test.go
@@ -25,6 +25,7 @@ import (
"github.com/apache/yunikorn-core/pkg/common"
"github.com/apache/yunikorn-core/pkg/entrypoint"
+ "github.com/apache/yunikorn-core/pkg/events"
"github.com/apache/yunikorn-core/pkg/scheduler"
"github.com/apache/yunikorn-core/pkg/scheduler/objects"
"github.com/apache/yunikorn-scheduler-interface/lib/go/api"
@@ -52,6 +53,7 @@ func (m *mockScheduler) Init(config string, autoSchedule
bool, withWebapp bool)
BuildInfoMap := make(map[string]string)
BuildInfoMap["k"] = "v"
+ events.Init()
m.serviceContext = entrypoint.StartAllServicesWithParams(!autoSchedule,
withWebapp)
m.proxy = m.serviceContext.RMProxy
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]