This is an automated email from the ASF dual-hosted git repository.
mani pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git
The following commit(s) were added to refs/heads/master by this push:
new 5fdb3bcc [YUNIKORN-1608] Configuration updates and storage for limit
(#543)
5fdb3bcc is described below
commit 5fdb3bccc6821ca7e3b8e184f621d493fbdd11c0
Author: Manikandan R <[email protected]>
AuthorDate: Thu Jun 15 13:22:07 2023 +0530
[YUNIKORN-1608] Configuration updates and storage for limit (#543)
Closes: #543
Signed-off-by: Manikandan R <[email protected]>
---
pkg/scheduler/objects/utilities_test.go | 10 +-
pkg/scheduler/partition.go | 11 +-
pkg/scheduler/partition_test.go | 249 +++++++----
pkg/scheduler/ugm/group_tracker.go | 12 +
pkg/scheduler/ugm/group_tracker_test.go | 44 ++
pkg/scheduler/ugm/manager.go | 459 +++++++++++++++++++--
pkg/scheduler/ugm/manager_test.go | 334 +++++++++++++++
pkg/scheduler/ugm/queue_tracker.go | 91 +++-
pkg/scheduler/ugm/queue_tracker_test.go | 14 -
pkg/scheduler/ugm/user_tracker.go | 17 +-
pkg/scheduler/ugm/user_tracker_test.go | 50 ++-
.../ugm/{utilities_test.go => utilities.go} | 22 +-
pkg/scheduler/ugm/utilities_test.go | 18 +
pkg/scheduler/utilities_test.go | 317 +++++++++++++-
pkg/webservice/dao/ugm_info.go | 2 +
pkg/webservice/handlers.go | 1 -
16 files changed, 1458 insertions(+), 193 deletions(-)
diff --git a/pkg/scheduler/objects/utilities_test.go
b/pkg/scheduler/objects/utilities_test.go
index 5559ef39..025989b1 100644
--- a/pkg/scheduler/objects/utilities_test.go
+++ b/pkg/scheduler/objects/utilities_test.go
@@ -34,11 +34,11 @@ import (
)
const (
- appID0 = "app-0"
- appID1 = "app-1"
- appID2 = "app-2"
- aKey = "alloc-1"
- nodeID1 = "node-1"
+ appID0 = "app-0"
+ appID1 = "app-1"
+ appID2 = "app-2"
+ aKey = "alloc-1"
+ nodeID1 = "node-1"
instType1 = "itype-1"
)
diff --git a/pkg/scheduler/partition.go b/pkg/scheduler/partition.go
index 39dd00b6..78f4ae88 100644
--- a/pkg/scheduler/partition.go
+++ b/pkg/scheduler/partition.go
@@ -39,6 +39,7 @@ import (
"github.com/apache/yunikorn-core/pkg/scheduler/objects"
"github.com/apache/yunikorn-core/pkg/scheduler/placement"
"github.com/apache/yunikorn-core/pkg/scheduler/policies"
+ "github.com/apache/yunikorn-core/pkg/scheduler/ugm"
"github.com/apache/yunikorn-core/pkg/webservice/dao"
"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
)
@@ -128,7 +129,9 @@ func (pc *PartitionContext) initialPartitionFromConfig(conf
configs.PartitionCon
// TODO get the resolver from the config
pc.userGroupCache = security.GetUserGroupCache("")
pc.updateNodeSortingPolicy(conf)
- return nil
+
+ // update limit settings: start at the root
+ return ugm.GetUserManager().UpdateConfig(queueConf, conf.Queues[0].Name)
}
// NOTE: this is a lock free call. It should only be called holding the
PartitionContext lock.
@@ -178,7 +181,11 @@ func (pc *PartitionContext) updatePartitionDetails(conf
configs.PartitionConfig)
}
root.UpdateQueueProperties()
// update the rest of the queues recursively
- return pc.updateQueues(queueConf.Queues, root)
+ if err := pc.updateQueues(queueConf.Queues, root); err != nil {
+ return err
+ }
+ // update limit settings: start at the root
+ return ugm.GetUserManager().UpdateConfig(queueConf, conf.Queues[0].Name)
}
// Process the config structure and create a queue info tree for this partition
diff --git a/pkg/scheduler/partition_test.go b/pkg/scheduler/partition_test.go
index 1da23729..665acc1d 100644
--- a/pkg/scheduler/partition_test.go
+++ b/pkg/scheduler/partition_test.go
@@ -94,6 +94,22 @@ func TestNewPartition(t *testing.T) {
Parent: true,
SubmitACL: "*",
Queues: nil,
+ Limits: []configs.Limit{
+ {
+ Limit: "root queue limit",
+ Users: []string{
+ "user1",
+ },
+ Groups: []string{
+ "group1",
+ },
+ MaxResources: map[string]string{
+ "memory": "10",
+ "vcores": "10",
+ },
+ MaxApplications: 1,
+ },
+ },
},
},
}
@@ -254,7 +270,7 @@ func TestAddNodeWithAllocations(t *testing.T) {
t.Errorf("add node to partition should have failed (uuid
missing)")
}
assert.Equal(t, partition.nodes.GetNodeCount(), 0, "error returned but
node still added to the partition (uuid)")
- assertUserGroupResource(t, getTestUserGroup(), nil)
+ assertLimits(t, getTestUserGroup(), nil)
// fix the alloc add the node will work now
alloc = objects.NewAllocation("alloc-1-uuid", nodeID1, instType1, ask)
@@ -269,7 +285,7 @@ func TestAddNodeWithAllocations(t *testing.T) {
// check the queue usage
assert.Assert(t, resources.Equals(q.GetAllocatedResource(), appRes),
"add node to partition did not update queue as expected")
- assertUserGroupResource(t, getTestUserGroup(), appRes)
+ assertLimits(t, getTestUserGroup(), appRes)
}
func TestRemoveNode(t *testing.T) {
@@ -313,7 +329,7 @@ func TestRemoveNodeWithAllocations(t *testing.T) {
// get what was allocated
allocated := node.GetAllAllocations()
assert.Equal(t, 1, len(allocated), "allocation not added correctly")
- assertUserGroupResource(t, getTestUserGroup(), appRes)
+ assertLimits(t, getTestUserGroup(), appRes)
// add broken allocations
ask = newAllocationAsk("alloc-na", "not-an-app", appRes)
@@ -322,7 +338,7 @@ func TestRemoveNodeWithAllocations(t *testing.T) {
ask = newAllocationAsk("alloc-2", appID1, appRes)
alloc = objects.NewAllocation("alloc-2-uuid", nodeID1, instType1, ask)
node.AddAllocation(alloc)
- assertUserGroupResource(t, getTestUserGroup(), appRes)
+ assertLimits(t, getTestUserGroup(), appRes)
// remove the node this cannot fail
released, confirmed := partition.removeNode(nodeID1)
@@ -330,7 +346,7 @@ func TestRemoveNodeWithAllocations(t *testing.T) {
assert.Equal(t, 1, len(released), "node did not release correct
allocation")
assert.Equal(t, 0, len(confirmed), "node did not confirm correct
allocation")
assert.Equal(t, released[0].GetUUID(), allocUUID, "uuid returned by
release not the same as on allocation")
- assertUserGroupResource(t, getTestUserGroup(), nil)
+ assertLimits(t, getTestUserGroup(), nil)
}
// test with a replacement of a placeholder: placeholder and real on the same
node that gets removed
@@ -357,7 +373,7 @@ func TestRemoveNodeWithPlaceholders(t *testing.T) {
allocated := node1.GetAllAllocations()
assert.Equal(t, 1, len(allocated), "allocation not added correctly to
node1 expected 1 got: %v", allocated)
assert.Assert(t, resources.Equals(node1.GetAllocatedResource(),
appRes), "allocation not added correctly to node1")
- assertUserGroupResource(t, getTestUserGroup(), appRes)
+ assertLimits(t, getTestUserGroup(), appRes)
// fake an ask that is used
ask = newAllocationAskAll(allocID, appID1, taskGroup, appRes, 1, 1,
false)
@@ -366,7 +382,7 @@ func TestRemoveNodeWithPlaceholders(t *testing.T) {
_, err = app.UpdateAskRepeat(allocID, -1)
assert.NilError(t, err, "ask should have been updated without error")
assert.Assert(t, resources.IsZero(app.GetPendingResource()), "app
should not have pending resources")
- assertUserGroupResource(t, getTestUserGroup(), appRes)
+ assertLimits(t, getTestUserGroup(), appRes)
// add real allocation that is replacing the placeholder
alloc := objects.NewAllocation(allocID, nodeID1, instType1, ask)
@@ -377,7 +393,7 @@ func TestRemoveNodeWithPlaceholders(t *testing.T) {
allocs = app.GetAllAllocations()
assert.Equal(t, len(allocs), 1, "expected one allocation for the app
(placeholder)")
- assertUserGroupResource(t, getTestUserGroup(), appRes)
+ assertLimits(t, getTestUserGroup(), appRes)
// remove the node that has both placeholder and real allocation
released, confirmed := partition.removeNode(nodeID1)
@@ -390,7 +406,7 @@ func TestRemoveNodeWithPlaceholders(t *testing.T) {
assert.Assert(t, resources.Equals(app.GetPendingResource(), appRes),
"app should have updated pending resources")
// check the interim state of the placeholder involved
assert.Equal(t, 0, ph.GetReleaseCount(), "placeholder should have no
releases linked anymore")
- assertUserGroupResource(t, getTestUserGroup(),
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 0}))
+ assertLimits(t, getTestUserGroup(),
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 0}))
}
func TestCalculateNodesResourceUsage(t *testing.T) {
@@ -758,7 +774,7 @@ func TestRemoveNodeWithReplacement(t *testing.T) {
// get what was allocated
allocated := node1.GetAllAllocations()
assert.Equal(t, 1, len(allocated), "allocation not added correctly to
node1")
- assertUserGroupResource(t, getTestUserGroup(), appRes)
+ assertLimits(t, getTestUserGroup(), appRes)
assert.Assert(t, resources.Equals(node1.GetAllocatedResource(),
appRes), "allocation not added correctly to node1")
node2 := setupNode(t, nodeID2, partition, nodeRes)
@@ -780,14 +796,14 @@ func TestRemoveNodeWithReplacement(t *testing.T) {
allocated = node2.GetAllAllocations()
assert.Equal(t, 1, len(allocated), "allocation not added correctly to
node2")
assert.Assert(t, resources.Equals(node2.GetAllocatedResource(),
appRes), "allocation not added correctly to node2 (resource count)")
- assertUserGroupResource(t, getTestUserGroup(), appRes)
+ assertLimits(t, getTestUserGroup(), appRes)
// double link as if the replacement is ongoing
ph.SetRelease(alloc)
allocs = app.GetAllAllocations()
assert.Equal(t, len(allocs), 1, "expected one allocation for the app
(placeholder)")
- assertUserGroupResource(t, getTestUserGroup(), appRes)
+ assertLimits(t, getTestUserGroup(), appRes)
// remove the node with the placeholder
released, confirmed := partition.removeNode(nodeID1)
@@ -803,7 +819,7 @@ func TestRemoveNodeWithReplacement(t *testing.T) {
assert.Equal(t, allocID, allocs[0].GetUUID(), "uuid for the app is not
the same as the real allocation")
assert.Equal(t, objects.Allocated, allocs[0].GetResult(), "allocation
state should be allocated")
assert.Equal(t, 0, allocs[0].GetReleaseCount(), "real allocation should
have no releases linked anymore")
- assertUserGroupResource(t, getTestUserGroup(), appRes)
+ assertLimits(t, getTestUserGroup(), appRes)
}
// test with a replacement of a placeholder: real on the removed node
placeholder on the 2nd node
@@ -830,7 +846,7 @@ func TestRemoveNodeWithReal(t *testing.T) {
allocated := node1.GetAllAllocations()
assert.Equal(t, 1, len(allocated), "allocation not added correctly to
node1")
assert.Assert(t, resources.Equals(node1.GetAllocatedResource(),
appRes), "allocation not added correctly to node1")
- assertUserGroupResource(t, getTestUserGroup(), appRes)
+ assertLimits(t, getTestUserGroup(), appRes)
node2 := setupNode(t, nodeID2, partition, nodeRes)
assert.Equal(t, 2, partition.GetTotalNodeCount(), "node list was not
updated as expected")
@@ -851,7 +867,7 @@ func TestRemoveNodeWithReal(t *testing.T) {
allocated = node2.GetAllAllocations()
assert.Equal(t, 1, len(allocated), "allocation not added correctly to
node2")
assert.Assert(t, resources.Equals(node2.GetAllocatedResource(),
appRes), "allocation not added correctly to node2 (resource count)")
- assertUserGroupResource(t, getTestUserGroup(), appRes)
+ assertLimits(t, getTestUserGroup(), appRes)
// double link as if the replacement is ongoing
ph.SetRelease(alloc)
@@ -869,7 +885,7 @@ func TestRemoveNodeWithReal(t *testing.T) {
assert.Equal(t, 1, len(allocs), "expected one allocation for the app
(placeholder")
assert.Equal(t, phID, allocs[0].GetUUID(), "uuid for the app is not the
same as the real allocation")
assert.Equal(t, 0, ph.GetReleaseCount(), "no inflight replacements
linked")
- assertUserGroupResource(t, getTestUserGroup(), appRes)
+ assertLimits(t, getTestUserGroup(), appRes)
}
func TestAddApp(t *testing.T) {
@@ -983,7 +999,7 @@ func TestRemoveApp(t *testing.T) {
alloc := objects.NewAllocation(uuid, nodeID1, instType1, ask)
err = partition.addAllocation(alloc)
assert.NilError(t, err, "add allocation to partition should not have
failed")
- assertUserGroupResource(t, getTestUserGroup(), appRes)
+ assertLimits(t, getTestUserGroup(), appRes)
allocs := partition.removeApplication("does_not_exist")
if allocs != nil {
@@ -994,13 +1010,13 @@ func TestRemoveApp(t *testing.T) {
app = newApplication(appID1, "default", defQueue)
err = partition.AddApplication(app)
assert.NilError(t, err, "add application to partition should not have
failed")
- assertUserGroupResource(t, getTestUserGroup(), appRes)
+ assertLimits(t, getTestUserGroup(), appRes)
// remove the newly added app (no allocations)
allocs = partition.removeApplication(appID1)
assert.Equal(t, 0, len(allocs), "existing application without
allocations returned allocations %v", allocs)
assert.Equal(t, 1, len(partition.applications), "existing application
was not removed")
- assertUserGroupResource(t, getTestUserGroup(), appRes)
+ assertLimits(t, getTestUserGroup(), appRes)
// add the application again and then an allocation
err = partition.AddApplication(app)
@@ -1010,7 +1026,7 @@ func TestRemoveApp(t *testing.T) {
alloc = objects.NewAllocation("alloc-1-uuid", nodeID1, instType1, ask)
err = partition.addAllocation(alloc)
assert.NilError(t, err, "add allocation to partition should not have
failed")
- assertUserGroupResource(t, getTestUserGroup(),
resources.Multiply(appRes, 2))
+ assertLimits(t, getTestUserGroup(), resources.Multiply(appRes, 2))
// remove the newly added app
allocs = partition.removeApplication(appID1)
@@ -1019,11 +1035,11 @@ func TestRemoveApp(t *testing.T) {
if partition.GetTotalAllocationCount() != 1 {
t.Errorf("allocation that should have been left was removed")
}
- assertUserGroupResource(t, getTestUserGroup(), appRes)
+ assertLimits(t, getTestUserGroup(), appRes)
allocs = partition.removeApplication("will_not_remove")
assert.Equal(t, 1, len(allocs), "existing application with allocations
returned unexpected allocations %v", allocs)
- assertUserGroupResource(t, getTestUserGroup(), nil)
+ assertLimits(t, getTestUserGroup(), nil)
}
func TestRemoveAppAllocs(t *testing.T) {
@@ -1043,14 +1059,14 @@ func TestRemoveAppAllocs(t *testing.T) {
ask := newAllocationAsk("alloc-nr", appNotRemoved, appRes)
alloc := objects.NewAllocation("alloc-nr-uuid", nodeID1, instType1, ask)
err = partition.addAllocation(alloc)
- assertUserGroupResource(t, getTestUserGroup(), appRes)
+ assertLimits(t, getTestUserGroup(), appRes)
ask = newAllocationAsk("alloc-1", appNotRemoved, appRes)
uuid := "alloc-1-uuid"
alloc = objects.NewAllocation(uuid, nodeID1, instType1, ask)
err = partition.addAllocation(alloc)
assert.NilError(t, err, "add allocation to partition should not have
failed")
- assertUserGroupResource(t, getTestUserGroup(),
resources.Multiply(appRes, 2))
+ assertLimits(t, getTestUserGroup(), resources.Multiply(appRes, 2))
release := &si.AllocationRelease{
PartitionName: "default",
ApplicationID: "",
@@ -1060,31 +1076,31 @@ func TestRemoveAppAllocs(t *testing.T) {
allocs, _ := partition.removeAllocation(release)
assert.Equal(t, 0, len(allocs), "empty removal request returned
allocations: %v", allocs)
- assertUserGroupResource(t, getTestUserGroup(),
resources.Multiply(appRes, 2))
+ assertLimits(t, getTestUserGroup(), resources.Multiply(appRes, 2))
// create a new release without app: should just return
release.ApplicationID = "does_not_exist"
allocs, _ = partition.removeAllocation(release)
assert.Equal(t, 0, len(allocs), "removal request for non existing
application returned allocations: %v", allocs)
- assertUserGroupResource(t, getTestUserGroup(),
resources.Multiply(appRes, 2))
+ assertLimits(t, getTestUserGroup(), resources.Multiply(appRes, 2))
// create a new release with app, non existing allocation: should just
return
release.ApplicationID = appNotRemoved
release.UUID = "does_not_exist"
allocs, _ = partition.removeAllocation(release)
assert.Equal(t, 0, len(allocs), "removal request for non existing
allocation returned allocations: %v", allocs)
- assertUserGroupResource(t, getTestUserGroup(),
resources.Multiply(appRes, 2))
+ assertLimits(t, getTestUserGroup(), resources.Multiply(appRes, 2))
// create a new release with app, existing allocation: should return 1
alloc
assert.Equal(t, 2, partition.GetTotalAllocationCount(), "pre-remove
allocation list incorrect: %v", partition.allocations)
release.UUID = uuid
allocs, _ = partition.removeAllocation(release)
assert.Equal(t, 1, len(allocs), "removal request for existing
allocation returned wrong allocations: %v", allocs)
assert.Equal(t, 1, partition.GetTotalAllocationCount(), "allocation
removal requests removed more than expected: %v", partition.allocations)
- assertUserGroupResource(t, getTestUserGroup(),
resources.Multiply(appRes, 1))
+ assertLimits(t, getTestUserGroup(), resources.Multiply(appRes, 1))
// create a new release with app, no uuid: should return last left alloc
release.UUID = ""
allocs, _ = partition.removeAllocation(release)
assert.Equal(t, 1, len(allocs), "removal request for existing
allocation returned wrong allocations: %v", allocs)
assert.Equal(t, 0, partition.GetTotalAllocationCount(), "removal
requests did not remove all allocations: %v", partition.allocations)
- assertUserGroupResource(t, getTestUserGroup(), nil)
+ assertLimits(t, getTestUserGroup(), nil)
}
// Dynamic queue creation based on the name from the rules
@@ -1308,7 +1324,15 @@ func TestTryAllocate(t *testing.T) {
assert.NilError(t, err, "failed to add app-2 to partition")
err = app.AddAllocationAsk(newAllocationAskPriority(allocID, appID2,
res, 1, 2))
assert.NilError(t, err, "failed to add ask alloc-1 to app-2")
- assertUserGroupResource(t, getTestUserGroup(), nil)
+
+ expectedQueuesMaxLimits := make(map[string]map[string]interface{})
+ expectedQueuesMaxLimits["root"] = make(map[string]interface{})
+ expectedQueuesMaxLimits["root.leaf"] = make(map[string]interface{})
+ expectedQueuesMaxLimits["root"][maxresources] =
resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 10,
"vcores": 10})
+ expectedQueuesMaxLimits["root.leaf"][maxresources] =
resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 5,
"vcores": 5})
+ expectedQueuesMaxLimits["root"][maxapplications] = uint64(2)
+ expectedQueuesMaxLimits["root.leaf"][maxapplications] = uint64(1)
+ assertUserGroupResourceMaxLimits(t, getTestUserGroup(), nil,
expectedQueuesMaxLimits)
// first allocation should be app-1 and alloc-2
alloc := partition.tryAllocate()
@@ -1319,7 +1343,7 @@ func TestTryAllocate(t *testing.T) {
assert.Equal(t, alloc.GetReleaseCount(), 0, "released allocations
should have been 0")
assert.Equal(t, alloc.GetApplicationID(), appID1, "expected application
app-1 to be allocated")
assert.Equal(t, alloc.GetAllocationKey(), allocID2, "expected ask
alloc-2 to be allocated")
- assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res,
1))
+ assertUserGroupResourceMaxLimits(t, getTestUserGroup(),
resources.Multiply(res, 1), expectedQueuesMaxLimits)
// second allocation should be app-2 and alloc-1: higher up in the
queue hierarchy
alloc = partition.tryAllocate()
@@ -1330,7 +1354,7 @@ func TestTryAllocate(t *testing.T) {
assert.Equal(t, alloc.GetReleaseCount(), 0, "released allocations
should have been 0")
assert.Equal(t, alloc.GetApplicationID(), appID2, "expected application
app-2 to be allocated")
assert.Equal(t, alloc.GetAllocationKey(), allocID, "expected ask
alloc-1 to be allocated")
- assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res,
2))
+ assertUserGroupResourceMaxLimits(t, getTestUserGroup(),
resources.Multiply(res, 2), expectedQueuesMaxLimits)
// third allocation should be app-1 and alloc-1
alloc = partition.tryAllocate()
@@ -1342,7 +1366,7 @@ func TestTryAllocate(t *testing.T) {
assert.Equal(t, alloc.GetApplicationID(), appID1, "expected application
app-1 to be allocated")
assert.Equal(t, alloc.GetAllocationKey(), allocID, "expected ask
alloc-1 to be allocated")
assert.Assert(t, resources.IsZero(partition.root.GetPendingResource()),
"pending resources should be set to zero")
- assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res,
3))
+ assertUserGroupResourceMaxLimits(t, getTestUserGroup(),
resources.Multiply(res, 3), expectedQueuesMaxLimits)
}
// allocate ask request with required node
@@ -1367,7 +1391,7 @@ func TestRequiredNodeReservation(t *testing.T) {
ask.SetRequiredNode(nodeID1)
err = app.AddAllocationAsk(ask)
assert.NilError(t, err, "failed to add ask alloc-1 to app-1")
- assertUserGroupResource(t, getTestUserGroup(), nil)
+ assertLimits(t, getTestUserGroup(), nil)
// first allocation should be app-1 and alloc-1
alloc := partition.tryAllocate()
@@ -1378,7 +1402,7 @@ func TestRequiredNodeReservation(t *testing.T) {
assert.Equal(t, alloc.GetReleaseCount(), 0, "released allocations
should have been 0")
assert.Equal(t, alloc.GetApplicationID(), appID1, "expected application
app-1 to be allocated")
assert.Equal(t, alloc.GetAllocationKey(), allocID, "expected ask
alloc-1 to be allocated")
- assertUserGroupResource(t, getTestUserGroup(), res)
+ assertLimits(t, getTestUserGroup(), res)
ask2 := newAllocationAsk(allocID2, appID1, res)
ask2.SetRequiredNode(nodeID1)
@@ -1391,7 +1415,7 @@ func TestRequiredNodeReservation(t *testing.T) {
// check if updated (must be after allocate call)
assert.Equal(t, 1, len(app.GetReservations()), "app should have one
reserved ask")
assert.Equal(t, 1, len(app.GetAskReservations(allocID2)), "ask should
have been reserved")
- assertUserGroupResource(t, getTestUserGroup(), res)
+ assertLimits(t, getTestUserGroup(), res)
// allocation that fits on the node should not be allocated
var res2 *resources.Resource
@@ -1409,7 +1433,7 @@ func TestRequiredNodeReservation(t *testing.T) {
// reservation count remains same as last try allocate should have
failed to find a reservation
assert.Equal(t, 1, len(app.GetReservations()), "ask should not have
been reserved, count changed")
- assertUserGroupResource(t, getTestUserGroup(), res)
+ assertLimits(t, getTestUserGroup(), res)
}
// allocate ask request with required node having non daemon set reservations
@@ -1595,7 +1619,7 @@ func TestRequiredNodeNotExist(t *testing.T) {
if alloc != nil {
t.Fatal("allocation should not have worked on unknown node")
}
- assertUserGroupResource(t, getTestUserGroup(), nil)
+ assertLimits(t, getTestUserGroup(), nil)
}
// basic ds scheduling on specific node in first allocate run itself (without
any need for reservation)
@@ -1630,7 +1654,7 @@ func TestRequiredNodeAllocation(t *testing.T) {
assert.Equal(t, alloc.GetReleaseCount(), 0, "released allocations
should have been 0")
assert.Equal(t, alloc.GetApplicationID(), appID1, "expected application
app-1 to be allocated")
assert.Equal(t, alloc.GetAllocationKey(), allocID, "expected ask
alloc-1 to be allocated")
- assertUserGroupResource(t, getTestUserGroup(), res)
+ assertLimits(t, getTestUserGroup(), res)
// required node set on ask
ask2 := newAllocationAsk(allocID2, appID1, res)
@@ -1647,7 +1671,7 @@ func TestRequiredNodeAllocation(t *testing.T) {
assert.Equal(t, 0, len(app.GetReservations()), "ask should not have
been reserved")
assert.Equal(t, alloc.GetAllocationKey(), allocID2, "expected ask
alloc-2 to be allocated")
assert.Equal(t, alloc.GetResult(), objects.Allocated, "result is not
the expected allocated")
- assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res,
2))
+ assertLimits(t, getTestUserGroup(), resources.Multiply(res, 2))
}
func TestPreemption(t *testing.T) {
@@ -1707,7 +1731,7 @@ func TestPreemption(t *testing.T) {
assert.Equal(t, 0, len(app2.GetReservations()), "ask should not be
reserved")
assert.Equal(t, alloc.GetResult(), objects.Allocated, "result should be
allocated")
assert.Equal(t, alloc.GetAllocationKey(), allocID3, "expected ask
alloc-3 to be allocated")
- assertUserGroupResource(t, getTestUserGroup(),
resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 10000}))
+ assertUserGroupResourceMaxLimits(t, getTestUserGroup(),
resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 10000}),
getExpectedQueuesLimitsForPreemption())
}
// Preemption followed by a normal allocation
@@ -1739,7 +1763,7 @@ func TestPreemptionForRequiredNodeReservedAlloc(t
*testing.T) {
assert.Equal(t, alloc.GetResult(), objects.AllocatedReserved, "result
is not the expected AllocatedReserved")
assert.Equal(t, alloc.GetReleaseCount(), 0, "released allocations
should have been 0")
assert.Equal(t, alloc.GetAllocationKey(), allocID2, "expected ask
alloc-2 to be allocated")
- assertUserGroupResource(t, getTestUserGroup(),
resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 8000}))
+ assertUserGroupResourceMaxLimits(t, getTestUserGroup(),
resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 8000}),
getExpectedQueuesLimitsForPreemption())
}
func TestPreemptionForRequiredNodeMultipleAttemptsAvoided(t *testing.T) {
@@ -1790,6 +1814,38 @@ func
TestPreemptionForRequiredNodeMultipleAttemptsAvoided(t *testing.T) {
assert.Equal(t, true, alloc.IsPreempted())
}
+func getExpectedQueuesLimitsForPreemption() map[string]map[string]interface{} {
+ expectedQueuesMaxLimits := make(map[string]map[string]interface{})
+ expectedQueuesMaxLimits["root"] = make(map[string]interface{})
+ expectedQueuesMaxLimits["root.parent"] = make(map[string]interface{})
+ expectedQueuesMaxLimits["root.parent.leaf1"] =
make(map[string]interface{})
+ expectedQueuesMaxLimits["root"][maxresources] =
resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 10,
"vcores": 10})
+ expectedQueuesMaxLimits["root.parent"][maxresources] =
resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 5,
"vcores": 5})
+ expectedQueuesMaxLimits["root.parent.leaf1"][maxresources] =
expectedQueuesMaxLimits["root.parent"][maxresources]
+ expectedQueuesMaxLimits["root"][maxapplications] = uint64(2)
+ expectedQueuesMaxLimits["root.parent"][maxapplications] = uint64(2)
+ expectedQueuesMaxLimits["root.parent.leaf1"][maxapplications] =
uint64(1)
+ expectedQueuesMaxLimits["root.parent.leaf1"][maxapplications] =
uint64(1)
+ return expectedQueuesMaxLimits
+}
+
+func getExpectedQueuesLimitsForPreemptionWithRequiredNode()
map[string]map[string]interface{} {
+ expectedQueuesMaxLimits := make(map[string]map[string]interface{})
+ expectedQueuesMaxLimits["root"] = make(map[string]interface{})
+ expectedQueuesMaxLimits["root.leaf"] = make(map[string]interface{})
+ expectedQueuesMaxLimits["root.parent"] = make(map[string]interface{})
+ expectedQueuesMaxLimits["root.parent.sub-leaf"] =
make(map[string]interface{})
+ expectedQueuesMaxLimits["root"][maxresources] =
resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 10,
"vcores": 10})
+ expectedQueuesMaxLimits["root.leaf"][maxresources] =
resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 5,
"vcores": 5})
+ expectedQueuesMaxLimits["root.parent"][maxresources] =
expectedQueuesMaxLimits["root.leaf"][maxresources]
+ expectedQueuesMaxLimits["root.parent.sub-leaf"][maxresources] =
resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 3,
"vcores": 3})
+ expectedQueuesMaxLimits["root"][maxapplications] = uint64(2)
+ expectedQueuesMaxLimits["root.leaf"][maxapplications] = uint64(1)
+ expectedQueuesMaxLimits["root.parent"][maxapplications] = uint64(2)
+ expectedQueuesMaxLimits["root.parent.sub-leaf"][maxapplications] =
uint64(2)
+ return expectedQueuesMaxLimits
+}
+
// setup the partition with existing allocations so we can test preemption
func setupPreemption(t *testing.T) (*PartitionContext, *objects.Application,
*objects.Application, *objects.Allocation, *objects.Allocation) {
partition := createPreemptionQueuesNodes(t)
@@ -1823,7 +1879,8 @@ func setupPreemption(t *testing.T) (*PartitionContext,
*objects.Application, *ob
assert.Equal(t, alloc1.GetApplicationID(), appID1, "expected
application app-1 to be allocated")
assert.Equal(t, alloc1.GetAllocationKey(), allocID, "expected ask
alloc-1 to be allocated")
assert.Equal(t, alloc1.GetNodeID(), nodeID1, "expected alloc-1 on
node-1")
- assertUserGroupResource(t, getTestUserGroup(),
resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 5000}))
+
+ assertUserGroupResourceMaxLimits(t, getTestUserGroup(),
resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 5000}),
getExpectedQueuesLimitsForPreemption())
// ask 2
ask2 := newAllocationAskPreempt(allocID2, appID1, 1, res)
@@ -1840,7 +1897,7 @@ func setupPreemption(t *testing.T) (*PartitionContext,
*objects.Application, *ob
assert.Equal(t, alloc2.GetApplicationID(), appID1, "expected
application app-1 to be allocated")
assert.Equal(t, alloc2.GetAllocationKey(), allocID2, "expected ask
alloc-2 to be allocated")
assert.Equal(t, alloc2.GetNodeID(), nodeID2, "expected alloc-2 on
node-2")
- assertUserGroupResource(t, getTestUserGroup(),
resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 10000}))
+ assertUserGroupResourceMaxLimits(t, getTestUserGroup(),
resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 10000}),
getExpectedQueuesLimitsForPreemption())
app2, _ := newApplicationWithHandler(appID2, "default",
"root.parent.leaf2")
@@ -1883,7 +1940,7 @@ func setupPreemptionForRequiredNode(t *testing.T)
(*PartitionContext, *objects.A
assert.Equal(t, alloc.GetReleaseCount(), 0, "released allocations
should have been 0")
assert.Equal(t, alloc.GetApplicationID(), appID1, "expected application
app-1 to be allocated")
assert.Equal(t, alloc.GetAllocationKey(), allocID, "expected ask
alloc-1 to be allocated")
- assertUserGroupResource(t, getTestUserGroup(),
resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 8000}))
+ assertUserGroupResourceMaxLimits(t, getTestUserGroup(),
resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 8000}),
getExpectedQueuesLimitsForPreemptionWithRequiredNode())
uuid := alloc.GetUUID()
// required node set on ask
@@ -1900,7 +1957,7 @@ func setupPreemptionForRequiredNode(t *testing.T)
(*PartitionContext, *objects.A
// check if updated (must be after allocate call)
assert.Equal(t, 1, len(app.GetReservations()), "ask should have been
reserved")
assert.Equal(t, 1, len(app.GetAskReservations(allocID2)), "ask should
have been reserved")
- assertUserGroupResource(t, getTestUserGroup(),
resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 8000}))
+ assertUserGroupResourceMaxLimits(t, getTestUserGroup(),
resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 8000}),
getExpectedQueuesLimitsForPreemptionWithRequiredNode())
// try through reserved scheduling cycle this should trigger preemption
alloc = partition.tryReservedAllocate()
@@ -1926,7 +1983,7 @@ func setupPreemptionForRequiredNode(t *testing.T)
(*PartitionContext, *objects.A
}
releases, _ := partition.removeAllocation(release)
assert.Equal(t, 1, len(releases), "unexpected number of allocations
released")
- assertUserGroupResource(t, getTestUserGroup(),
resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 0}))
+ assertUserGroupResourceMaxLimits(t, getTestUserGroup(),
resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 0}),
getExpectedQueuesLimitsForPreemptionWithRequiredNode())
return partition, app
}
@@ -1960,7 +2017,7 @@ func TestTryAllocateLarge(t *testing.T) {
t.Fatalf("allocation did return allocation which does not fit:
%s", alloc)
}
assert.Equal(t, 0, len(app.GetReservations()), "ask should not have
been reserved")
- assertUserGroupResource(t, getTestUserGroup(), nil)
+ assertLimits(t, getTestUserGroup(), nil)
}
func TestAllocReserveNewNode(t *testing.T) {
@@ -2003,7 +2060,7 @@ func TestAllocReserveNewNode(t *testing.T) {
t.Fatal("1st allocation did not return the correct allocation")
}
assert.Equal(t, objects.Allocated, alloc.GetResult(), "allocation
result should have been allocated")
- assertUserGroupResource(t, getTestUserGroup(),
resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 8000}))
+ assertLimits(t, getTestUserGroup(),
resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 8000}))
// the second one should be reserved as the 2nd node is not scheduling
alloc = partition.tryAllocate()
@@ -2012,7 +2069,7 @@ func TestAllocReserveNewNode(t *testing.T) {
}
// check if updated (must be after allocate call)
assert.Equal(t, 1, len(app.GetReservations()), "ask should have been
reserved")
- assertUserGroupResource(t, getTestUserGroup(),
resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 8000}))
+ assertLimits(t, getTestUserGroup(),
resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 8000}))
// turn on 2nd node
node2.SetSchedulable(true)
@@ -2024,7 +2081,7 @@ func TestAllocReserveNewNode(t *testing.T) {
node1 := partition.GetNode(nodeID1)
assert.Equal(t, 0, len(node1.GetReservationKeys()), "old node should
have no more reservations")
assert.Equal(t, 0, len(app.GetReservations()), "ask should have been
reserved")
- assertUserGroupResource(t, getTestUserGroup(),
resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 16000}))
+ assertLimits(t, getTestUserGroup(),
resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 16000}))
}
func TestTryAllocateReserve(t *testing.T) {
@@ -2073,7 +2130,7 @@ func TestTryAllocateReserve(t *testing.T) {
assert.Equal(t, alloc.GetReleaseCount(), 0, "released allocations
should have been 0")
assert.Equal(t, alloc.GetApplicationID(), appID1, "expected application
app-1 to be allocated")
assert.Equal(t, alloc.GetAllocationKey(), "alloc-2", "expected ask
alloc-2 to be allocated")
- assertUserGroupResource(t, getTestUserGroup(),
resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 1000}))
+ assertLimits(t, getTestUserGroup(),
resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 1000}))
// reservations should have been removed: it is in progress
if app.IsReservedOnNode(node2.NodeID) ||
len(app.GetAskReservations("alloc-2")) != 0 {
@@ -2094,7 +2151,7 @@ func TestTryAllocateReserve(t *testing.T) {
assert.Equal(t, alloc.GetReleaseCount(), 0, "released allocations
should have been 0")
assert.Equal(t, alloc.GetApplicationID(), appID1, "expected application
app-1 to be allocated")
assert.Equal(t, alloc.GetAllocationKey(), "alloc-1", "expected ask
alloc-1 to be allocated")
- assertUserGroupResource(t, getTestUserGroup(),
resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 2000}))
+ assertLimits(t, getTestUserGroup(),
resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 2000}))
if !resources.IsZero(partition.root.GetPendingResource()) {
t.Fatalf("pending allocations should be set to zero")
@@ -2139,7 +2196,7 @@ func TestTryAllocateWithReserved(t *testing.T) {
assert.Equal(t, "", alloc.GetReservedNodeID(), "reserved node should be
reset after processing")
assert.Equal(t, 0, len(node2.GetReservationKeys()), "reservation should
have been removed from node")
assert.Equal(t, false, app.IsReservedOnNode(node2.NodeID), "reservation
cleanup for ask on app failed")
- assertUserGroupResource(t, getTestUserGroup(),
resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 5000}))
+ assertLimits(t, getTestUserGroup(),
resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 5000}))
// node2 is unreserved now so the next one should allocate on the 2nd
node (fair sharing)
alloc = partition.tryAllocate()
@@ -2148,7 +2205,7 @@ func TestTryAllocateWithReserved(t *testing.T) {
}
assert.Equal(t, objects.Allocated, alloc.GetResult(), "expected
allocated allocation to be returned")
assert.Equal(t, node2.NodeID, alloc.GetNodeID(), "expected allocation
on node2 to be returned")
- assertUserGroupResource(t, getTestUserGroup(),
resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 10000}))
+ assertLimits(t, getTestUserGroup(),
resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 10000}))
}
// remove the reserved ask while allocating in flight for the ask
@@ -2186,7 +2243,7 @@ func TestScheduleRemoveReservedAsk(t *testing.T) {
t.Fatalf("expected allocated allocation to be returned
(step %d) %s", i, alloc)
}
}
- assertUserGroupResource(t, getTestUserGroup(),
resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 16000}))
+ assertLimits(t, getTestUserGroup(),
resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 16000}))
// add a asks which should reserve
ask = newAllocationAskRepeat("alloc-2", appID1, res, 1)
@@ -2206,7 +2263,7 @@ func TestScheduleRemoveReservedAsk(t *testing.T) {
assert.Equal(t, len(app.GetReservations()), i, "application
reservations incorrect")
}
assert.Equal(t, len(app.GetReservations()), 2, "application
reservations should be 2")
- assertUserGroupResource(t, getTestUserGroup(),
resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 16000}))
+ assertLimits(t, getTestUserGroup(),
resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 16000}))
// add a node
node := newNodeMaxResource("node-3", res)
@@ -2218,7 +2275,7 @@ func TestScheduleRemoveReservedAsk(t *testing.T) {
if alloc == nil || alloc.GetResult() != objects.AllocatedReserved {
t.Fatalf("expected allocatedReserved allocation to be returned
%v", alloc)
}
- assertUserGroupResource(t, getTestUserGroup(),
resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 20000}))
+ assertLimits(t, getTestUserGroup(),
resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 20000}))
// before confirming remove the ask: do what the scheduler does when it
gets a request from a
// shim in processAllocationReleaseByAllocationKey()
@@ -2230,13 +2287,13 @@ func TestScheduleRemoveReservedAsk(t *testing.T) {
released := app.RemoveAllocationAsk(removeAskID)
assert.Equal(t, released, 1, "expected one reservations to be released")
assert.Equal(t, len(app.GetReservations()), 1, "application
reservations should be 1")
- assertUserGroupResource(t, getTestUserGroup(),
resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 20000}))
+ assertLimits(t, getTestUserGroup(),
resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 20000}))
// now confirm the allocation: this should not remove the reservation
rmAlloc := partition.allocate(alloc)
assert.Equal(t, "", rmAlloc.GetReservedNodeID(), "reserved node should
be reset after processing")
assert.Equal(t, len(app.GetReservations()), 1, "application
reservations should be kept at 1")
- assertUserGroupResource(t, getTestUserGroup(),
resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 20000}))
+ assertLimits(t, getTestUserGroup(),
resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 20000}))
}
// update the config with nodes registered and make sure that the root max and
guaranteed are not changed
@@ -2258,6 +2315,22 @@ func TestUpdateRootQueue(t *testing.T) {
Parent: true,
SubmitACL: "*",
Queues: nil,
+ Limits: []configs.Limit{
+ {
+ Limit: "root queue limit",
+ Users: []string{
+ "testuser",
+ },
+ Groups: []string{
+ "testgroup",
+ },
+ MaxResources: map[string]string{
+ "memory": "10",
+ "vcores": "10",
+ },
+ MaxApplications: 2,
+ },
+ },
},
},
PlacementRules: nil,
@@ -2538,7 +2611,7 @@ func TestPlaceholderSmallerThanReal(t *testing.T) {
assert.Assert(t, resources.Equals(phRes,
app.GetQueue().GetAllocatedResource()), "placeholder size should be allocated
on queue")
assert.Assert(t, resources.Equals(phRes, node.GetAllocatedResource()),
"placeholder size should be allocated on node")
assert.Equal(t, 1, partition.GetTotalAllocationCount(), "placeholder
allocation should be registered on the partition")
- assertUserGroupResource(t, getTestUserGroup(), phRes)
+ assertLimits(t, getTestUserGroup(), phRes)
// add an ask which is larger than the placeholder
ask = newAllocationAskTG(allocID, appID1, taskGroup, tgRes, false)
@@ -2549,7 +2622,7 @@ func TestPlaceholderSmallerThanReal(t *testing.T) {
t.Fatal("allocation should not have matched placeholder")
}
assert.Assert(t, ph.IsReleased(), "placeholder should be released")
- assertUserGroupResource(t, getTestUserGroup(), phRes)
+ assertLimits(t, getTestUserGroup(), phRes)
// wait for events to be processed
err = common.WaitFor(10*time.Millisecond, time.Second, func() bool {
@@ -2567,7 +2640,7 @@ func TestPlaceholderSmallerThanReal(t *testing.T) {
assert.Equal(t, phID, record.ObjectID, "incorrect allocation ID,
expected placeholder alloc ID")
assert.Equal(t, appID1, record.ReferenceID, "event should reference
application ID")
assert.Assert(t, strings.Contains(record.Message, "Task group 'tg-1' in
application 'app-1'"), "unexpected message in record")
- assertUserGroupResource(t, getTestUserGroup(), phRes)
+ assertLimits(t, getTestUserGroup(), phRes)
// release placeholder: do what the context would do after the shim
processing
release := &si.AllocationRelease{
@@ -2581,7 +2654,7 @@ func TestPlaceholderSmallerThanReal(t *testing.T) {
assert.Assert(t, resources.IsZero(node.GetAllocatedResource()),
"nothing should be allocated on node")
assert.Assert(t,
resources.IsZero(app.GetQueue().GetAllocatedResource()), "nothing should be
allocated on queue")
assert.Equal(t, 0, partition.GetTotalAllocationCount(), "no allocation
should be registered on the partition")
- assertUserGroupResource(t, getTestUserGroup(),
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 0,
"second": 0}))
+ assertLimits(t, getTestUserGroup(),
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 0,
"second": 0}))
}
// one real allocation should trigger cleanup of all placeholders
@@ -2623,7 +2696,7 @@ func TestPlaceholderSmallerMulti(t *testing.T) {
assert.Assert(t, resources.Equals(tgRes,
app.GetQueue().GetAllocatedResource()), "all placeholders should be allocated
on queue")
assert.Assert(t, resources.Equals(tgRes, node.GetAllocatedResource()),
"all placeholders should be allocated on node")
assert.Equal(t, phCount, partition.GetTotalAllocationCount(),
"placeholder allocation should be registered on the partition")
- assertUserGroupResource(t, getTestUserGroup(), tgRes)
+ assertLimits(t, getTestUserGroup(), tgRes)
// add an ask which is larger than the placeholder
ask := newAllocationAskTG(allocID, appID1, taskGroup, tgRes, false)
@@ -2649,7 +2722,7 @@ func TestPlaceholderSmallerMulti(t *testing.T) {
t.Fatal("collecting eventChannel should return something")
}
assert.Equal(t, phCount, len(records), "expecting %d events for
placeholder mismatch", phCount)
- assertUserGroupResource(t, getTestUserGroup(), tgRes)
+ assertLimits(t, getTestUserGroup(), tgRes)
// release placeholders: do what the context would do after the shim
processing
for id, ph := range phs {
@@ -2666,7 +2739,7 @@ func TestPlaceholderSmallerMulti(t *testing.T) {
assert.Assert(t, resources.IsZero(node.GetAllocatedResource()),
"nothing should be allocated on node")
assert.Assert(t,
resources.IsZero(app.GetQueue().GetAllocatedResource()), "nothing should be
allocated on queue")
assert.Equal(t, 0, partition.GetTotalAllocationCount(), "no allocation
should be registered on the partition")
- assertUserGroupResource(t, getTestUserGroup(),
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 0,
"second": 0}))
+ assertLimits(t, getTestUserGroup(),
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 0,
"second": 0}))
}
func TestPlaceholderBiggerThanReal(t *testing.T) {
@@ -2701,7 +2774,7 @@ func TestPlaceholderBiggerThanReal(t *testing.T) {
assert.Assert(t, resources.Equals(phRes,
app.GetQueue().GetAllocatedResource()), "placeholder size should be allocated
on queue")
assert.Assert(t, resources.Equals(phRes, node.GetAllocatedResource()),
"placeholder size should be allocated on node")
assert.Equal(t, 1, partition.GetTotalAllocationCount(), "placeholder
allocation should be registered on the partition")
- assertUserGroupResource(t, getTestUserGroup(), phRes)
+ assertLimits(t, getTestUserGroup(), phRes)
// add a new ask with smaller request and allocate
ask = newAllocationAskTG(allocID, appID1, taskGroup, smallRes, false)
@@ -2716,7 +2789,7 @@ func TestPlaceholderBiggerThanReal(t *testing.T) {
// no updates yet on queue and node
assert.Assert(t, resources.Equals(phRes,
app.GetQueue().GetAllocatedResource()), "placeholder size should still be
allocated on queue")
assert.Assert(t, resources.Equals(phRes, node.GetAllocatedResource()),
"placeholder size should still be allocated on node")
- assertUserGroupResource(t, getTestUserGroup(), phRes)
+ assertLimits(t, getTestUserGroup(), phRes)
// replace the placeholder: do what the context would do after the shim
processing
release := &si.AllocationRelease{
@@ -2733,7 +2806,7 @@ func TestPlaceholderBiggerThanReal(t *testing.T) {
assert.Equal(t, 1, partition.GetTotalAllocationCount(), "real
allocation should be registered on the partition")
assert.Assert(t, resources.Equals(smallRes,
app.GetQueue().GetAllocatedResource()), "real size should be allocated on
queue")
assert.Assert(t, resources.Equals(smallRes,
node.GetAllocatedResource()), "real size should be allocated on node")
- assertUserGroupResource(t, getTestUserGroup(), smallRes)
+ assertLimits(t, getTestUserGroup(), smallRes)
}
func TestPlaceholderMatch(t *testing.T) {
@@ -2762,7 +2835,7 @@ func TestPlaceholderMatch(t *testing.T) {
phUUID := ph.GetUUID()
assert.Equal(t, phID, ph.GetAllocationKey(), "expected allocation of
ph-1 to be returned")
assert.Equal(t, 1, len(app.GetAllPlaceholderData()), "placeholder data
should be created on allocate")
- assertUserGroupResource(t, getTestUserGroup(), phRes)
+ assertLimits(t, getTestUserGroup(), phRes)
// add a new ask with an unknown task group (should allocate directly)
ask = newAllocationAskTG(allocID, appID1, "unknown", phRes, false)
@@ -2776,7 +2849,7 @@ func TestPlaceholderMatch(t *testing.T) {
assert.Equal(t, 1, len(app.GetAllPlaceholderData()), "placeholder data
should not be updated")
assert.Equal(t, int64(1), app.GetAllPlaceholderData()[0].Count,
"placeholder data should show 1 available placeholder")
assert.Equal(t, int64(0), app.GetAllPlaceholderData()[0].Replaced,
"placeholder data should show no replacements")
- assertUserGroupResource(t, getTestUserGroup(),
resources.Multiply(phRes, 2))
+ assertLimits(t, getTestUserGroup(), resources.Multiply(phRes, 2))
// add a new ask the same task group as the placeholder
ask = newAllocationAskTG(allocID2, appID1, taskGroup, phRes, false)
@@ -2789,7 +2862,7 @@ func TestPlaceholderMatch(t *testing.T) {
assert.Equal(t, 1, len(app.GetAllPlaceholderData()), "placeholder data
should not be updated")
assert.Equal(t, int64(1), app.GetAllPlaceholderData()[0].Count,
"placeholder data should show 1 available placeholder")
assert.Equal(t, int64(0), app.GetAllPlaceholderData()[0].Replaced,
"placeholder data should show no replacements")
- assertUserGroupResource(t, getTestUserGroup(),
resources.Multiply(phRes, 2))
+ assertLimits(t, getTestUserGroup(), resources.Multiply(phRes, 2))
// replace the placeholder should work
alloc = partition.tryPlaceholderAllocate()
@@ -2812,7 +2885,7 @@ func TestPlaceholderMatch(t *testing.T) {
t.Fatal("confirmed allocation should not be nil")
}
assert.Equal(t, int64(1), app.GetAllPlaceholderData()[0].Replaced,
"placeholder data should show the replacement")
- assertUserGroupResource(t, getTestUserGroup(),
resources.Multiply(phRes, 2))
+ assertLimits(t, getTestUserGroup(), resources.Multiply(phRes, 2))
// add a new ask the same task group as the placeholder
// all placeholders are used so direct allocation is expected
@@ -2823,7 +2896,7 @@ func TestPlaceholderMatch(t *testing.T) {
if alloc == nil {
t.Fatal("expected ask to be allocated no placeholders left")
}
- assertUserGroupResource(t, getTestUserGroup(),
resources.Multiply(phRes, 3))
+ assertLimits(t, getTestUserGroup(), resources.Multiply(phRes, 3))
}
// simple direct replace with one node
@@ -2857,7 +2930,7 @@ func TestTryPlaceholderAllocate(t *testing.T) {
if alloc != nil {
t.Fatalf("placeholder ask should not be allocated: %s", alloc)
}
- assertUserGroupResource(t, getTestUserGroup(), nil)
+ assertLimits(t, getTestUserGroup(), nil)
// try to allocate a placeholder via normal allocate
alloc = partition.tryAllocate()
if alloc == nil {
@@ -2870,7 +2943,7 @@ func TestTryPlaceholderAllocate(t *testing.T) {
t.Fatalf("placeholder allocation not updated as expected: got
%s, expected %s", app.GetPlaceholderResource(), res)
}
assert.Equal(t, partition.GetTotalAllocationCount(), 1, "placeholder
allocation should be counted as alloc")
- assertUserGroupResource(t, getTestUserGroup(), res)
+ assertLimits(t, getTestUserGroup(), res)
// add a second ph ask and run it again it should not match the already
allocated placeholder
ask = newAllocationAskTG("ph-2", appID1, taskGroup, res, true)
err = app.AddAllocationAsk(ask)
@@ -2880,7 +2953,7 @@ func TestTryPlaceholderAllocate(t *testing.T) {
if alloc != nil {
t.Fatalf("placeholder ask should not be allocated: %s", alloc)
}
- assertUserGroupResource(t, getTestUserGroup(), res)
+ assertLimits(t, getTestUserGroup(), res)
alloc = partition.tryAllocate()
if alloc == nil {
t.Fatal("expected 2nd placeholder to be allocated")
@@ -2890,7 +2963,7 @@ func TestTryPlaceholderAllocate(t *testing.T) {
t.Fatalf("placeholder allocation not updated as expected: got
%s, expected %s", app.GetPlaceholderResource(), resources.Multiply(res, 2))
}
assert.Equal(t, partition.GetTotalAllocationCount(), 2, "placeholder
allocation should be counted as alloc")
- assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res,
2))
+ assertLimits(t, getTestUserGroup(), resources.Multiply(res, 2))
// not mapping to the same taskgroup should not do anything
ask = newAllocationAskTG(allocID, appID1, "tg-unk", res, false)
@@ -2900,7 +2973,7 @@ func TestTryPlaceholderAllocate(t *testing.T) {
if alloc != nil {
t.Fatalf("allocation should not have matched placeholder: %s",
alloc)
}
- assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res,
2))
+ assertLimits(t, getTestUserGroup(), resources.Multiply(res, 2))
// add an ask with the TG
ask = newAllocationAskTG(allocID2, appID1, taskGroup, res, false)
@@ -2913,7 +2986,7 @@ func TestTryPlaceholderAllocate(t *testing.T) {
assert.Equal(t, partition.GetTotalAllocationCount(), 2, "placeholder
replacement should not be counted as alloc")
assert.Equal(t, alloc.GetResult(), objects.Replaced, "result is not the
expected allocated replaced")
assert.Equal(t, alloc.GetReleaseCount(), 1, "released allocations
should have been 1")
- assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res,
2))
+ assertLimits(t, getTestUserGroup(), resources.Multiply(res, 2))
phUUID := alloc.GetFirstRelease().GetUUID()
// placeholder is not released until confirmed by the shim
if !resources.Equals(app.GetPlaceholderResource(),
resources.Multiply(res, 2)) {
@@ -2940,7 +3013,7 @@ func TestTryPlaceholderAllocate(t *testing.T) {
if !resources.Equals(app.GetAllocatedResource(), res) {
t.Fatalf("allocations not updated as expected: got %s, expected
%s", app.GetAllocatedResource(), res)
}
- assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res,
2))
+ assertLimits(t, getTestUserGroup(), resources.Multiply(res, 2))
}
// The failure is triggered by the predicate plugin and is hidden in the alloc
handling
@@ -2985,11 +3058,11 @@ func TestFailReplacePlaceholder(t *testing.T) {
if !resources.Equals(app.GetPlaceholderResource(), res) {
t.Fatalf("placeholder allocation not updated as expected: got
%s, expected %s", app.GetPlaceholderResource(), res)
}
- assertUserGroupResource(t, getTestUserGroup(), res)
+ assertLimits(t, getTestUserGroup(), res)
// add 2nd node to allow allocation
node2 := setupNode(t, nodeID2, partition, tgRes)
- assertUserGroupResource(t, getTestUserGroup(), res)
+ assertLimits(t, getTestUserGroup(), res)
// add an ask with the TG
ask = newAllocationAskTG(allocID, appID1, taskGroup, res, false)
err = app.AddAllocationAsk(ask)
@@ -3016,7 +3089,7 @@ func TestFailReplacePlaceholder(t *testing.T) {
if !resources.Equals(app.GetPlaceholderResource(), res) {
t.Fatalf("placeholder allocation not updated as expected: got
%s, expected %s", app.GetPlaceholderResource(), resources.Multiply(res, 2))
}
- assertUserGroupResource(t, getTestUserGroup(), res)
+ assertLimits(t, getTestUserGroup(), res)
// release placeholder: do what the context would do after the shim
processing
release := &si.AllocationRelease{
@@ -3040,7 +3113,7 @@ func TestFailReplacePlaceholder(t *testing.T) {
if !resources.Equals(node2.GetAllocatedResource(), res) {
t.Fatalf("node-2 allocations not set as expected: got %s,
expected %s", node2.GetAllocatedResource(), res)
}
- assertUserGroupResource(t, getTestUserGroup(), res)
+ assertLimits(t, getTestUserGroup(), res)
}
func TestAddAllocationAsk(t *testing.T) {
@@ -3081,7 +3154,7 @@ func TestAddAllocationAsk(t *testing.T) {
if !resources.Equals(app.GetPendingResource(), res) {
t.Fatal("app not updated by adding ask, no error thrown")
}
- assertUserGroupResource(t, getTestUserGroup(), nil)
+ assertLimits(t, getTestUserGroup(), nil)
}
func TestRemoveAllocationAsk(t *testing.T) {
@@ -3134,7 +3207,7 @@ func TestRemoveAllocationAsk(t *testing.T) {
release.TerminationType = si.TerminationType_STOPPED_BY_RM
partition.removeAllocationAsk(release)
assert.Assert(t, resources.IsZero(app.GetPendingResource()), "app
should not have pending asks")
- assertUserGroupResource(t, getTestUserGroup(), nil)
+ assertLimits(t, getTestUserGroup(), nil)
}
func TestUpdateNodeSortingPolicy(t *testing.T) {
diff --git a/pkg/scheduler/ugm/group_tracker.go
b/pkg/scheduler/ugm/group_tracker.go
index 12f84e5f..6a4f99b2 100644
--- a/pkg/scheduler/ugm/group_tracker.go
+++ b/pkg/scheduler/ugm/group_tracker.go
@@ -65,6 +65,18 @@ func (gt *GroupTracker) getTrackedApplications()
map[string]bool {
return gt.applications
}
+func (gt *GroupTracker) setMaxApplications(count uint64, queuePath string)
error {
+ gt.Lock()
+ defer gt.Unlock()
+ return gt.queueTracker.setMaxApplications(count, queuePath)
+}
+
+func (gt *GroupTracker) setMaxResources(resource *resources.Resource,
queuePath string) error {
+ gt.Lock()
+ defer gt.Unlock()
+ return gt.queueTracker.setMaxResources(resource, queuePath)
+}
+
func (gt *GroupTracker) GetGroupResourceUsageDAOInfo()
*dao.GroupResourceUsageDAOInfo {
gt.RLock()
defer gt.RUnlock()
diff --git a/pkg/scheduler/ugm/group_tracker_test.go
b/pkg/scheduler/ugm/group_tracker_test.go
index 16757b2e..f9608b21 100644
--- a/pkg/scheduler/ugm/group_tracker_test.go
+++ b/pkg/scheduler/ugm/group_tracker_test.go
@@ -161,6 +161,50 @@ func TestGTDecreaseTrackedResource(t *testing.T) {
assert.Equal(t, removeQT, true, "wrong remove queue tracker value")
}
+func TestGTSetMaxLimits(t *testing.T) {
+ // Queue setup:
+ // root->parent->child1
+ user := security.UserGroup{User: "test", Groups: []string{"test"}}
+ groupTracker := newGroupTracker(user.User)
+ usage1, err := resources.NewResourceFromConf(map[string]string{"mem":
"10M", "vcore": "10"})
+ if err != nil {
+ t.Errorf("new resource create returned error or wrong resource:
error %t, res %v", err, usage1)
+ }
+ err = groupTracker.increaseTrackedResource(queuePath1, TestApp1, usage1)
+ if err != nil {
+ t.Fatalf("unable to increase tracked resource: queuepath %s,
app %s, res %v, error %t", queuePath1, TestApp1, usage1, err)
+ }
+
+ setMaxAppsErr := groupTracker.setMaxApplications(1, queuePath1)
+ assert.NilError(t, setMaxAppsErr)
+
+ setMaxResourcesErr := groupTracker.setMaxResources(usage1, queuePath1)
+ assert.NilError(t, setMaxResourcesErr)
+
+ setParentMaxAppsErr := groupTracker.setMaxApplications(1, "root.parent")
+ assert.NilError(t, setParentMaxAppsErr)
+
+ setParentMaxResourcesErr := groupTracker.setMaxResources(usage1,
"root.parent")
+ assert.NilError(t, setParentMaxResourcesErr)
+
+ err = groupTracker.increaseTrackedResource(queuePath1, TestApp2, usage1)
+ if err != nil {
+ t.Fatalf("unable to increase tracked resource: queuepath %s,
app %s, res %v, error %t", queuePath1, TestApp1, usage1, err)
+ }
+
+ setMaxAppsErr1 := groupTracker.setMaxApplications(1, queuePath1)
+ assert.Error(t, setMaxAppsErr1, "current running applications is
greater than config max applications for "+queuePath1)
+
+ setMaxResourcesErr1 := groupTracker.setMaxResources(usage1, queuePath1)
+ assert.Error(t, setMaxResourcesErr1, "current resource usage is greater
than config max resource for "+queuePath1)
+
+ setParentMaxAppsErr1 := groupTracker.setMaxApplications(1,
"root.parent")
+ assert.Error(t, setParentMaxAppsErr1, "current running applications is
greater than config max applications for root.parent")
+
+ setParentMaxResourcesErr1 := groupTracker.setMaxResources(usage1,
"root.parent")
+ assert.Error(t, setParentMaxResourcesErr1, "current resource usage is
greater than config max resource for root.parent")
+}
+
func getGroupResource(gt *GroupTracker) map[string]*resources.Resource {
resources := make(map[string]*resources.Resource)
usage := gt.GetGroupResourceUsageDAOInfo()
diff --git a/pkg/scheduler/ugm/manager.go b/pkg/scheduler/ugm/manager.go
index 2862c114..f6716150 100644
--- a/pkg/scheduler/ugm/manager.go
+++ b/pkg/scheduler/ugm/manager.go
@@ -24,6 +24,7 @@ import (
"go.uber.org/zap"
+ "github.com/apache/yunikorn-core/pkg/common/configs"
"github.com/apache/yunikorn-core/pkg/common/resources"
"github.com/apache/yunikorn-core/pkg/common/security"
"github.com/apache/yunikorn-core/pkg/log"
@@ -32,19 +33,25 @@ import (
var once sync.Once
var m *Manager
+const maxresources = "maxresources"
+const maxapplications = "maxapplications"
+
// Manager implements tracker. A User Group Manager to track the usage for
both user and groups.
// Holds object of both user and group trackers
type Manager struct {
- userTrackers map[string]*UserTracker
- groupTrackers map[string]*GroupTracker
- lock sync.RWMutex
+ userTrackers map[string]*UserTracker
+ groupTrackers map[string]*GroupTracker
+ userLimitsConfig map[string]map[string]map[string]interface{} // Hold
limits settings of user * queue path
+ groupLimitsConfig map[string]map[string]map[string]interface{} // Hold
limits settings of group * queue path
+ sync.RWMutex
}
func newManager() *Manager {
manager := &Manager{
- userTrackers: make(map[string]*UserTracker),
- groupTrackers: make(map[string]*GroupTracker),
- lock: sync.RWMutex{},
+ userTrackers: make(map[string]*UserTracker),
+ groupTrackers: make(map[string]*GroupTracker),
+ userLimitsConfig:
make(map[string]map[string]map[string]interface{}),
+ groupLimitsConfig:
make(map[string]map[string]map[string]interface{}),
}
return manager
}
@@ -73,11 +80,66 @@ func (m *Manager) IncreaseTrackedResource(queuePath string,
applicationID string
return fmt.Errorf("mandatory parameters are missing. queuepath:
%s, application id: %s, resource usage: %s, user: %s",
queuePath, applicationID, usage.String(), user.User)
}
- m.lock.Lock()
- defer m.lock.Unlock()
+ m.Lock()
+ defer m.Unlock()
var userTracker *UserTracker
if m.userTrackers[user.User] == nil {
- userTracker = newUserTracker(user)
+ userTracker = newUserTracker(user.User)
+
+ // Set the limits for all configured queue paths of the user
+ for configQueuePath, config := range
m.userLimitsConfig[user.User] {
+ log.Logger().Debug("Setting the limit max applications
settings.",
+ zap.String("user", user.User),
+ zap.String("queue path", configQueuePath))
+ maxApps, ok := config[maxapplications].(uint64)
+ if !ok {
+ log.Logger().Warn("Problem in setting the limit
max applications settings. Unable to cast the value from interface to uint64",
+ zap.String("user", user.User),
+ zap.String("queue path",
configQueuePath),
+ zap.Uint64("limit max applications",
maxApps))
+ return fmt.Errorf("unable to set the max
applications. user: %s, queuepath : %s, applicationid: %s",
+ user.User, configQueuePath,
applicationID)
+ }
+ err := userTracker.setMaxApplications(maxApps,
configQueuePath)
+ if err != nil {
+ log.Logger().Warn("Problem in setting the limit
max applications settings.",
+ zap.String("user", user.User),
+ zap.String("queue path",
configQueuePath),
+ zap.Uint64("limit max applications",
maxApps),
+ zap.Error(err))
+ return fmt.Errorf("unable to set the max
applications. user: %s, queuepath : %s, applicationid: %s, usage: %s, reason:
%w",
+ user.User, configQueuePath,
applicationID, usage.String(), err)
+ }
+ maxResources, ok :=
config[maxresources].(map[string]string)
+ if !ok {
+ log.Logger().Warn("Problem in setting the limit
max resources settings. Unable to cast the value from interface to resource",
+ zap.String("user", user.User),
+ zap.String("queue path",
configQueuePath),
+ zap.Any("limit max resources",
maxResources))
+ return fmt.Errorf("unable to set the max
resources. user: %s, queuepath : %s, applicationid: %s",
+ user.User, configQueuePath,
applicationID)
+ }
+ resource, resourceErr :=
resources.NewResourceFromConf(maxResources)
+ if resourceErr != nil {
+ log.Logger().Warn("Problem in setting the limit
max resources settings.",
+ zap.String("user", user.User),
+ zap.String("queue path",
configQueuePath),
+ zap.Any("limit max resources",
maxResources),
+ zap.Error(resourceErr))
+ return fmt.Errorf("unable to set the max
resources. user: %s, queuepath : %s, applicationid: %s, usage: %s, reason: %w",
+ user.User, configQueuePath,
applicationID, usage.String(), resourceErr)
+ }
+ setMaxResourcesErr :=
userTracker.setMaxResources(resource, configQueuePath)
+ if setMaxResourcesErr != nil {
+ log.Logger().Warn("Problem in setting the limit
max resources settings.",
+ zap.String("user", user.User),
+ zap.String("queue path",
configQueuePath),
+ zap.Any("limit max resources",
maxResources),
+ zap.Error(setMaxResourcesErr))
+ return fmt.Errorf("unable to set the max
resources. user: %s, queuepath : %s, applicationid: %s, usage: %s, reason: %w",
+ user.User, configQueuePath,
applicationID, usage.String(), setMaxResourcesErr)
+ }
+ }
m.userTrackers[user.User] = userTracker
} else {
userTracker = m.userTrackers[user.User]
@@ -136,8 +198,8 @@ func (m *Manager) DecreaseTrackedResource(queuePath string,
applicationID string
return fmt.Errorf("mandatory parameters are missing. queuepath:
%s, application id: %s, resource usage: %s, user: %s",
queuePath, applicationID, usage.String(), user.User)
}
- m.lock.Lock()
- defer m.lock.Unlock()
+ m.Lock()
+ defer m.Unlock()
userTracker := m.userTrackers[user.User]
if userTracker != nil {
removeQT, err := userTracker.decreaseTrackedResource(queuePath,
applicationID, usage, removeApp)
@@ -190,26 +252,28 @@ func (m *Manager) DecreaseTrackedResource(queuePath
string, applicationID string
}
func (m *Manager) GetUserResources(user security.UserGroup)
*resources.Resource {
- m.lock.RLock()
- defer m.lock.RUnlock()
- if m.userTrackers[user.User] != nil {
- return m.userTrackers[user.User].queueTracker.resourceUsage
+ m.RLock()
+ defer m.RUnlock()
+ ut := m.userTrackers[user.User]
+ if ut != nil &&
len(ut.GetUserResourceUsageDAOInfo().Queues.ResourceUsage.Resources) > 0 {
+ return ut.GetUserResourceUsageDAOInfo().Queues.ResourceUsage
}
return nil
}
func (m *Manager) GetGroupResources(group string) *resources.Resource {
- m.lock.RLock()
- defer m.lock.RUnlock()
- if m.groupTrackers[group] != nil {
- return m.groupTrackers[group].queueTracker.resourceUsage
+ m.RLock()
+ defer m.RUnlock()
+ gt := m.groupTrackers[group]
+ if gt != nil &&
len(gt.GetGroupResourceUsageDAOInfo().Queues.ResourceUsage.Resources) > 0 {
+ return gt.GetGroupResourceUsageDAOInfo().Queues.ResourceUsage
}
return nil
}
func (m *Manager) GetUsersResources() []*UserTracker {
- m.lock.RLock()
- defer m.lock.RUnlock()
+ m.RLock()
+ defer m.RUnlock()
var userTrackers []*UserTracker
for _, tracker := range m.userTrackers {
userTrackers = append(userTrackers, tracker)
@@ -218,8 +282,8 @@ func (m *Manager) GetUsersResources() []*UserTracker {
}
func (m *Manager) GetUserTracker(user string) *UserTracker {
- m.lock.RLock()
- defer m.lock.RUnlock()
+ m.RLock()
+ defer m.RUnlock()
if m.userTrackers[user] != nil {
return m.userTrackers[user]
}
@@ -227,8 +291,8 @@ func (m *Manager) GetUserTracker(user string) *UserTracker {
}
func (m *Manager) GetGroupsResources() []*GroupTracker {
- m.lock.RLock()
- defer m.lock.RUnlock()
+ m.RLock()
+ defer m.RUnlock()
var groupTrackers []*GroupTracker
for _, tracker := range m.groupTrackers {
groupTrackers = append(groupTrackers, tracker)
@@ -237,8 +301,8 @@ func (m *Manager) GetGroupsResources() []*GroupTracker {
}
func (m *Manager) GetGroupTracker(group string) *GroupTracker {
- m.lock.RLock()
- defer m.lock.RUnlock()
+ m.RLock()
+ defer m.RUnlock()
if m.groupTrackers[group] != nil {
return m.groupTrackers[group]
}
@@ -260,6 +324,60 @@ func (m *Manager) ensureGroupTrackerForApp(queuePath
string, applicationID strin
zap.String("user", user.User),
zap.String("group", group))
groupTracker = newGroupTracker(group)
+
+ // Set the limits for all configured queue paths of the
group
+ for configQueuePath, config := range
m.groupLimitsConfig[group] {
+ log.Logger().Debug("Setting the limit max
applications settings.",
+ zap.String("group", group),
+ zap.String("queue path",
configQueuePath))
+ maxApps, ok := config[maxapplications].(uint64)
+ if !ok {
+ log.Logger().Warn("Problem in setting
the limit max applications settings. Unable to cast the value from interface to
uint64",
+ zap.String("group", group),
+ zap.String("queue path",
configQueuePath),
+ zap.Uint64("limit max
applications", maxApps))
+ return fmt.Errorf("unable to set the
max applications. group: %s, queuepath : %s, applicationid: %s",
+ group, configQueuePath,
applicationID)
+ }
+ if setMaxApplicationsErr :=
groupTracker.setMaxApplications(maxApps, configQueuePath);
setMaxApplicationsErr != nil {
+ log.Logger().Warn("Problem in setting
the limit max applications settings.",
+ zap.String("group", group),
+ zap.String("queue path",
configQueuePath),
+ zap.Uint64("limit max
applications", maxApps),
+
zap.Error(setMaxApplicationsErr))
+ return fmt.Errorf("unable to set the
max applications. group: %s, queuepath : %s, applicationid: %s, reason: %w",
+ group, configQueuePath,
applicationID, setMaxApplicationsErr)
+ }
+
+ maxResources, ok :=
config[maxresources].(map[string]string)
+ if !ok {
+ log.Logger().Warn("Problem in setting
the limit max resources settings. Unable to cast the value from interface to
resource",
+ zap.String("group", group),
+ zap.String("queue path",
configQueuePath),
+ zap.Any("limit max resources",
maxResources))
+ return fmt.Errorf("unable to set the
max resources. group: %s, queuepath : %s, applicationid: %s",
+ group, configQueuePath,
applicationID)
+ }
+ resource, resourceErr :=
resources.NewResourceFromConf(maxResources)
+ if resourceErr != nil {
+ log.Logger().Warn("Problem in setting
the limit max resources settings.",
+ zap.String("group", group),
+ zap.String("queue path",
configQueuePath),
+ zap.Any("limit max resources",
maxResources),
+ zap.Error(resourceErr))
+ return fmt.Errorf("unable to set the
max resources. group: %s, queuepath : %s, applicationid: %s, reason: %w",
+ group, configQueuePath,
applicationID, resourceErr)
+ }
+ if setMaxResourcesErr :=
groupTracker.setMaxResources(resource, configQueuePath); setMaxResourcesErr !=
nil {
+ log.Logger().Warn("Problem in setting
the limit max resources settings.",
+ zap.String("group", group),
+ zap.String("queue path",
configQueuePath),
+ zap.Any("limit max resources",
maxResources),
+ zap.Error(setMaxResourcesErr))
+ return fmt.Errorf("unable to set the
max resources. group: %s, queuepath : %s, applicationid: %s, reason: %w",
+ group, configQueuePath,
applicationID, setMaxResourcesErr)
+ }
+ }
m.groupTrackers[group] = groupTracker
} else {
log.Logger().Debug("Group tracker already exists and
linking (reusing) the same with application",
@@ -286,8 +404,8 @@ func (m *Manager) getGroup(user security.UserGroup)
(string, error) {
// cleaner Auto wakeup go routine to remove the user and group trackers based
on applications being tracked upon, its root queueTracker usage etc
// nolint:unused
func (m *Manager) cleaner() {
- m.lock.Lock()
- defer m.lock.Unlock()
+ m.Lock()
+ defer m.Unlock()
for user, ut := range m.userTrackers {
if m.isUserRemovable(ut) {
delete(m.userTrackers, user)
@@ -314,15 +432,290 @@ func (m *Manager) isGroupRemovable(gt *GroupTracker)
bool {
return false
}
+func (m *Manager) UpdateConfig(config configs.QueueConfig, queuePath string)
error {
+ m.Lock()
+ defer m.Unlock()
+
+ // clear the local limit config maps before processing the limit config
+ m.userLimitsConfig = make(map[string]map[string]map[string]interface{})
+ m.groupLimitsConfig = make(map[string]map[string]map[string]interface{})
+ return m.internalProcessConfig(config, queuePath)
+}
+
+func (m *Manager) internalProcessConfig(cur configs.QueueConfig, queuePath
string) error {
+ // Holds user and group for which limits have been configured with
specific queue path
+ userGroupLimits := make(map[string]bool)
+ // Traverse limits of specific queue path
+ for _, limit := range cur.Limits {
+ tempLimitsMap := make(map[string]interface{})
+ tempLimitsMap[maxresources] = limit.MaxResources
+ tempLimitsMap[maxapplications] = limit.MaxApplications
+ for _, user := range limit.Users {
+ log.Logger().Debug("Processing user limits
configuration",
+ zap.String("user", user),
+ zap.String("limit", limit.Limit),
+ zap.String("queue path", queuePath),
+ zap.Uint64("max application",
limit.MaxApplications),
+ zap.Any("max resources", limit.MaxResources))
+ tempUserMap := make(map[string]map[string]interface{})
+ tempUserMap[queuePath] = tempLimitsMap
+ if err := m.processUserConfig(user, limit, queuePath,
userGroupLimits, tempLimitsMap, tempUserMap); err != nil {
+ return err
+ }
+ }
+ for _, group := range limit.Groups {
+ log.Logger().Debug("Processing group limits
configuration",
+ zap.String("group", group),
+ zap.String("limit", limit.Limit),
+ zap.String("queue path", queuePath),
+ zap.Uint64("max application",
limit.MaxApplications),
+ zap.Any("max resources", limit.MaxResources))
+ tempGroupMap := make(map[string]map[string]interface{})
+ tempGroupMap[queuePath] = tempLimitsMap
+ if err := m.processGroupConfig(group, limit, queuePath,
userGroupLimits, tempLimitsMap, tempGroupMap); err != nil {
+ return err
+ }
+ }
+ }
+ if err := m.clearEarlierSetLimits(userGroupLimits, queuePath); err !=
nil {
+ return err
+ }
+
+ if len(cur.Queues) > 0 {
+ for _, child := range cur.Queues {
+ childQueuePath := queuePath + configs.DOT + child.Name
+ if err := m.internalProcessConfig(child,
childQueuePath); err != nil {
+ return err
+ }
+ }
+ }
+ return nil
+}
+
+func (m *Manager) processUserConfig(user string, limit configs.Limit,
queuePath string, userGroupLimits map[string]bool, tempLimitsMap
map[string]interface{}, tempUserMap map[string]map[string]interface{}) error {
+ if user == "*" {
+ // traverse all tracked users
+ for u, ut := range m.userTrackers {
+ // Is this user already tracked for the queue path?
+ if m.IsQueuePathTrackedCompletely(ut.queueTracker,
queuePath) {
+ log.Logger().Debug("Processing wild card user
limits configuration for all existing users",
+ zap.String("user", u),
+ zap.String("limit", limit.Limit),
+ zap.String("queue path", queuePath),
+ zap.Uint64("max application",
limit.MaxApplications),
+ zap.Any("max resources",
limit.MaxResources))
+
+ // creates an entry for the user being
processed always as it has been cleaned before
+ if _, ok := m.userLimitsConfig[u]; ok {
+ m.userLimitsConfig[u][queuePath] =
tempLimitsMap
+ } else {
+ m.userLimitsConfig[u] = tempUserMap
+ }
+ if err := m.setUserLimits(u, limit, queuePath);
err != nil {
+ return err
+ }
+ userGroupLimits[u] = true
+ }
+ }
+ } else {
+ // creates an entry for the user being processed always as it
has been cleaned before
+ if _, ok := m.userLimitsConfig[user]; ok {
+ m.userLimitsConfig[user][queuePath] = tempLimitsMap
+ } else {
+ m.userLimitsConfig[user] = tempUserMap
+ }
+ if err := m.setUserLimits(user, limit, queuePath); err != nil {
+ return err
+ }
+ userGroupLimits[user] = true
+ }
+ return nil
+}
+
+func (m *Manager) processGroupConfig(group string, limit configs.Limit,
queuePath string, userGroupLimits map[string]bool, tempLimitsMap
map[string]interface{}, tempGroupMap map[string]map[string]interface{}) error {
+ if group == "*" {
+ // traverse all tracked groups
+ for g, gt := range m.groupTrackers {
+ // Is this group already tracked for the queue path?
+ if m.IsQueuePathTrackedCompletely(gt.queueTracker,
queuePath) {
+ log.Logger().Debug("Processing wild card user
limits configuration for all existing groups",
+ zap.String("group", g),
+ zap.String("limit", limit.Limit),
+ zap.String("queue path", queuePath),
+ zap.Uint64("max application",
limit.MaxApplications),
+ zap.Any("max resources",
limit.MaxResources))
+ // creates an entry for the group being
processed always as it has been cleaned before
+ if _, ok := m.groupLimitsConfig[g]; ok {
+ m.groupLimitsConfig[g][queuePath] =
tempLimitsMap
+ } else {
+ m.groupLimitsConfig[g] = tempGroupMap
+ }
+ if err := m.setGroupLimits(g, limit,
queuePath); err != nil {
+ return err
+ }
+ userGroupLimits[g] = true
+ }
+ }
+ } else {
+ // creates an entry for the group being processed always as it
has been cleaned before
+ if _, ok := m.groupLimitsConfig[group]; ok {
+ m.groupLimitsConfig[group][queuePath] = tempLimitsMap
+ } else {
+ m.groupLimitsConfig[group] = tempGroupMap
+ }
+ if err := m.setGroupLimits(group, limit, queuePath); err != nil
{
+ return err
+ }
+ userGroupLimits[group] = true
+ }
+ return nil
+}
+
+func (m *Manager) clearEarlierSetLimits(userGroupLimits map[string]bool,
queuePath string) error {
+ // Clear already configured limits of user for which limits have been
configured before but not now through #cur
+ for u, ut := range m.userTrackers {
+ // Is this user already tracked for the queue path?
+ if m.IsQueuePathTrackedCompletely(ut.queueTracker, queuePath) {
+ if _, ok := userGroupLimits[u]; !ok {
+ err :=
ut.setMaxResources(resources.NewResource(), queuePath)
+ if err != nil {
+ return err
+ }
+ err = ut.setMaxApplications(0, queuePath)
+ if err != nil {
+ return err
+ }
+ }
+ }
+ }
+
+ // Clear already configured limits of group for which limits have been
configured before but not now through #cur
+ for g, gt := range m.groupTrackers {
+ // Is this group already tracked for the queue path?
+ if m.IsQueuePathTrackedCompletely(gt.queueTracker, queuePath) {
+ if _, ok := userGroupLimits[g]; !ok {
+ if err :=
gt.setMaxResources(resources.NewResource(), queuePath); err != nil {
+ return err
+ }
+ if err := gt.setMaxApplications(0, queuePath);
err != nil {
+ return err
+ }
+ }
+ }
+ }
+ return nil
+}
+
+func (m *Manager) setUserLimits(user string, limit configs.Limit, queuePath
string) error {
+ log.Logger().Debug("Setting user limits",
+ zap.String("user", user),
+ zap.String("limit", limit.Limit),
+ zap.String("queue path", queuePath),
+ zap.Uint64("max application", limit.MaxApplications),
+ zap.Any("max resources", limit.MaxResources))
+ userTracker, ok := m.userTrackers[user]
+ if !ok {
+ log.Logger().Debug("User tracker does not exist. Creating user
tracker object to set the limit configuration",
+ zap.String("user", user),
+ zap.String("queue path", queuePath))
+ userTracker = newUserTracker(user)
+ m.userTrackers[user] = userTracker
+ }
+ if err := userTracker.setMaxApplications(limit.MaxApplications,
queuePath); err != nil {
+ log.Logger().Warn("Problem in setting the limit max
applications settings.",
+ zap.String("user", user),
+ zap.String("queue path", queuePath),
+ zap.Uint64("limit max applications",
limit.MaxApplications),
+ zap.Error(err))
+ return fmt.Errorf("unable to set the limit for user %s because
%w", user, err)
+ }
+
+ if resource, err := resources.NewResourceFromConf(limit.MaxResources);
err == nil {
+ if err = userTracker.setMaxResources(resource, queuePath); err
!= nil {
+ log.Logger().Warn("Problem in setting the limit max
resources settings.",
+ zap.String("user", user),
+ zap.String("queue path", queuePath),
+ zap.Any("limit max resources",
limit.MaxResources),
+ zap.Error(err))
+ return fmt.Errorf("unable to set the limit for user %s
because %w", user, err)
+ }
+ } else {
+ log.Logger().Warn("Problem in using the limit max resources
settings.",
+ zap.String("user", user),
+ zap.String("queue path", queuePath),
+ zap.Any("limit max resources", limit.MaxResources),
+ zap.Error(err))
+ return fmt.Errorf("unable to set the limit for user %s because
%w", user, err)
+ }
+ return nil
+}
+
+func (m *Manager) setGroupLimits(group string, limit configs.Limit, queuePath
string) error {
+ log.Logger().Debug("Setting group limits",
+ zap.String("group", group),
+ zap.String("limit", limit.Limit),
+ zap.String("queue path", queuePath),
+ zap.Uint64("max application", limit.MaxApplications),
+ zap.Any("max resources", limit.MaxResources))
+ groupTracker, ok := m.groupTrackers[group]
+ if !ok {
+ log.Logger().Debug("Group tracker does not exist. Creating
group tracker object to set the limit configuration",
+ zap.String("group", group),
+ zap.String("queue path", queuePath))
+ groupTracker = newGroupTracker(group)
+ m.groupTrackers[group] = groupTracker
+ }
+ if err := groupTracker.setMaxApplications(limit.MaxApplications,
queuePath); err != nil {
+ log.Logger().Warn("Problem in setting the limit max
applications settings.",
+ zap.String("group", group),
+ zap.String("queue path", queuePath),
+ zap.Uint64("limit max applications",
limit.MaxApplications),
+ zap.Error(err))
+ return fmt.Errorf("unable to set the limit for group %s because
%w", group, err)
+ }
+
+ if resource, err := resources.NewResourceFromConf(limit.MaxResources);
err == nil {
+ if err = groupTracker.setMaxResources(resource, queuePath); err
!= nil {
+ log.Logger().Warn("Problem in setting the limit max
resources settings.",
+ zap.String("group", group),
+ zap.String("queue path", queuePath),
+ zap.Any("limit max resources",
limit.MaxResources),
+ zap.Error(err))
+ return fmt.Errorf("unable to set the limit for group %s
because %w", group, err)
+ }
+ } else {
+ log.Logger().Warn("Problem in using the limit max resources
settings.",
+ zap.String("group", group),
+ zap.String("queue path", queuePath),
+ zap.Any("limit max resources", limit.MaxResources),
+ zap.Error(err))
+ return fmt.Errorf("unable to set the limit for group %s because
%w", group, err)
+ }
+ return nil
+}
+
+func (m *Manager) IsQueuePathTrackedCompletely(qt *QueueTracker, queuePath
string) bool {
+ if queuePath == configs.RootQueue || queuePath == qt.queueName {
+ return true
+ }
+ childQueuePath, immediateChildQueueName := getChildQueuePath(queuePath)
+ if immediateChildQueueName != "" {
+ if childUt, ok :=
qt.childQueueTrackers[immediateChildQueueName]; ok {
+ return m.IsQueuePathTrackedCompletely(childUt,
childQueuePath)
+ }
+ }
+ return false
+}
+
// ClearUserTrackers only for tests
func (m *Manager) ClearUserTrackers() {
- m.lock.Lock()
- defer m.lock.Unlock()
+ m.Lock()
+ defer m.Unlock()
m.userTrackers = make(map[string]*UserTracker)
}
func (m *Manager) ClearGroupTrackers() {
- m.lock.Lock()
- defer m.lock.Unlock()
+ m.Lock()
+ defer m.Unlock()
m.groupTrackers = make(map[string]*GroupTracker)
}
diff --git a/pkg/scheduler/ugm/manager_test.go
b/pkg/scheduler/ugm/manager_test.go
index 908044c7..c8765553 100644
--- a/pkg/scheduler/ugm/manager_test.go
+++ b/pkg/scheduler/ugm/manager_test.go
@@ -24,6 +24,7 @@ import (
"gotest.tools/v3/assert"
+ "github.com/apache/yunikorn-core/pkg/common/configs"
"github.com/apache/yunikorn-core/pkg/common/resources"
"github.com/apache/yunikorn-core/pkg/common/security"
)
@@ -97,6 +98,20 @@ func TestAddRemoveUserAndGroups(t *testing.T) {
assert.Equal(t, user1.User, manager.GetUserTracker(user1.User).userName)
assert.Equal(t, user1.Groups[0],
manager.GetGroupTracker(user1.Groups[0]).groupName)
+ assert.Equal(t, true,
manager.IsQueuePathTrackedCompletely(manager.GetUserTracker(user.User).queueTracker,
queuePath1))
+ assert.Equal(t, true,
manager.IsQueuePathTrackedCompletely(manager.GetUserTracker(user1.User).queueTracker,
queuePath2))
+ assert.Equal(t, false,
manager.IsQueuePathTrackedCompletely(manager.GetUserTracker(user1.User).queueTracker,
queuePath1))
+ assert.Equal(t, false,
manager.IsQueuePathTrackedCompletely(manager.GetUserTracker(user.User).queueTracker,
queuePath2))
+ assert.Equal(t, false,
manager.IsQueuePathTrackedCompletely(manager.GetUserTracker(user.User).queueTracker,
queuePath3))
+ assert.Equal(t, false,
manager.IsQueuePathTrackedCompletely(manager.GetUserTracker(user.User).queueTracker,
queuePath4))
+
+ assert.Equal(t, true,
manager.IsQueuePathTrackedCompletely(manager.GetUserTracker(user.Groups[0]).queueTracker,
queuePath1))
+ assert.Equal(t, true,
manager.IsQueuePathTrackedCompletely(manager.GetUserTracker(user1.Groups[0]).queueTracker,
queuePath2))
+ assert.Equal(t, false,
manager.IsQueuePathTrackedCompletely(manager.GetUserTracker(user1.Groups[0]).queueTracker,
queuePath1))
+ assert.Equal(t, false,
manager.IsQueuePathTrackedCompletely(manager.GetUserTracker(user.Groups[0]).queueTracker,
queuePath2))
+ assert.Equal(t, false,
manager.IsQueuePathTrackedCompletely(manager.GetUserTracker(user.Groups[0]).queueTracker,
queuePath3))
+ assert.Equal(t, false,
manager.IsQueuePathTrackedCompletely(manager.GetUserTracker(user.Groups[0]).queueTracker,
queuePath4))
+
usage3, err := resources.NewResourceFromConf(map[string]string{"mem":
"5M", "vcore": "5"})
if err != nil {
t.Errorf("new resource create returned error or wrong resource:
error %t, res %v", err, usage3)
@@ -129,6 +144,313 @@ func TestAddRemoveUserAndGroups(t *testing.T) {
assert.Assert(t, manager.GetGroupTracker(user.Groups[0]) == nil)
}
+func TestIncreaseUserResourceWithInvalidConfig(t *testing.T) {
+ setupUGM()
+ // Queue setup:
+ // root->parent
+ user := security.UserGroup{User: "user1", Groups: []string{"group1"}}
+ manager := GetUserManager()
+
+ expectedResource, err :=
resources.NewResourceFromConf(map[string]string{"memory": "5", "vcores": "5"})
+ if err != nil {
+ t.Errorf("new resource create returned error or wrong resource:
error %t, res %v", err, expectedResource)
+ }
+
+ m.userLimitsConfig[user.User] = make(map[string]map[string]interface{})
+ m.userLimitsConfig[user.User][queuePath1] = make(map[string]interface{})
+ m.userLimitsConfig[user.User][queuePath1][maxapplications] = -2
+ assert.Error(t, manager.IncreaseTrackedResource(queuePath1, TestApp1,
expectedResource, user), "unable to set the max applications. user:
"+user.User+", queuepath : "+queuePath1+", applicationid: "+TestApp1)
+
+ m.userLimitsConfig[user.User][queuePath1][maxapplications] = uint64(2)
+ m.userLimitsConfig[user.User][queuePath1][maxresources] =
make(map[string]interface{})
+ assert.Error(t, manager.IncreaseTrackedResource(queuePath1, TestApp1,
expectedResource, user), "unable to set the max resources. user: "+user.User+",
queuepath : "+queuePath1+", applicationid: "+TestApp1)
+
+ m.userLimitsConfig[user.User][queuePath1][maxapplications] = uint64(2)
+ m.userLimitsConfig[user.User][queuePath1][maxresources] =
map[string]string{"invalid": "-5", "vcores": "5"}
+ assert.Error(t, manager.IncreaseTrackedResource(queuePath1, TestApp1,
expectedResource, user), "unable to set the max resources. user: "+user.User+",
queuepath : "+queuePath1+", applicationid: "+TestApp1+", usage: map[memory:5
vcores:5], reason: invalid quantity")
+
+ m.userLimitsConfig[user.User][queuePath1][maxapplications] = uint64(2)
+ m.userLimitsConfig[user.User][queuePath1][maxresources] =
map[string]string{"memory": "5", "vcores": "5"}
+ m.groupLimitsConfig[user.Groups[0]] =
make(map[string]map[string]interface{})
+ m.groupLimitsConfig[user.Groups[0]][queuePath1] =
make(map[string]interface{})
+ m.groupLimitsConfig[user.Groups[0]][queuePath1][maxapplications] = -2
+ assert.Error(t, manager.IncreaseTrackedResource(queuePath1, TestApp1,
expectedResource, user), "unable to set the max applications. group:
"+user.Groups[0]+", queuepath : "+queuePath1+", applicationid: "+TestApp1)
+
+ m.userLimitsConfig[user.User][queuePath1][maxapplications] = uint64(2)
+ m.userLimitsConfig[user.User][queuePath1][maxresources] =
map[string]string{"memory": "5", "vcores": "5"}
+ m.groupLimitsConfig[user.Groups[0]][queuePath1][maxapplications] =
uint64(2)
+ m.groupLimitsConfig[user.Groups[0]][queuePath1][maxresources] =
make(map[string]interface{})
+ assert.Error(t, manager.IncreaseTrackedResource(queuePath1, TestApp1,
expectedResource, user), "unable to set the max resources. group:
"+user.Groups[0]+", queuepath : "+queuePath1+", applicationid: "+TestApp1)
+
+ m.userLimitsConfig[user.User][queuePath1][maxapplications] = uint64(2)
+ m.userLimitsConfig[user.User][queuePath1][maxresources] =
map[string]string{"memory": "5", "vcores": "5"}
+ m.groupLimitsConfig[user.Groups[0]][queuePath1][maxapplications] =
uint64(2)
+ m.groupLimitsConfig[user.Groups[0]][queuePath1][maxresources] =
map[string]string{"invalid": "-5", "vcores": "5"}
+ assert.Error(t, manager.IncreaseTrackedResource(queuePath1, TestApp1,
expectedResource, user), "unable to set the max resources. group:
"+user.Groups[0]+", queuepath : "+queuePath1+", applicationid: "+TestApp1+",
reason: invalid quantity")
+}
+
+func TestUpdateConfig(t *testing.T) {
+ setupUGM()
+ // Queue setup:
+ // root->parent
+ user := security.UserGroup{User: "user1", Groups: []string{"group1"}}
+ manager := GetUserManager()
+
+ expectedResource, err :=
resources.NewResourceFromConf(map[string]string{"memory": "5", "vcores": "5"})
+ if err != nil {
+ t.Errorf("new resource create returned error or wrong resource:
error %t, res %v", err, expectedResource)
+ }
+
+ conf := createConfig(user.User, user.Groups[0], "memory", "-10")
+ assert.Error(t, manager.UpdateConfig(conf.Queues[0], "root"), "unable
to set the limit for user user1 because invalid quantity")
+ conf = createConfig(user.User, user.Groups[0], "invalid", "invalidate")
+ assert.Error(t, manager.UpdateConfig(conf.Queues[0], "root"), "unable
to set the limit for user user1 because invalid quantity")
+ conf = createUpdateConfig(user.User, user.Groups[0])
+
+ manager = GetUserManager()
+ assert.NilError(t, manager.UpdateConfig(conf.Queues[0], "root"))
+
+ assertMaxLimits(t, user, expectedResource, 1)
+
+ for i := 1; i <= 2; i++ {
+ err = manager.IncreaseTrackedResource(queuePath1, TestApp1,
expectedResource, user)
+ if err != nil {
+ t.Fatalf("unable to increase tracked resource:
queuepath %s, app %s, res %v, error %t", queuePath1, TestApp1,
expectedResource, err)
+ }
+ }
+ assert.Error(t, manager.UpdateConfig(conf.Queues[0], "root"), "unable
to set the limit for user user1 because current resource usage is greater than
config max resource for root.parent")
+
+ err = manager.IncreaseTrackedResource(queuePath1, TestApp1,
expectedResource, user)
+ if err != nil {
+ t.Fatalf("unable to increase tracked resource: queuepath %s,
app %s, res %v, error %t", queuePath1, TestApp1, expectedResource, err)
+ }
+ assert.Error(t, manager.UpdateConfig(conf.Queues[0], "root"), "unable
to set the limit for user user1 because current resource usage is greater than
config max resource for root")
+}
+
+func TestUpdateConfigWithWildCardUsersAndGroups(t *testing.T) {
+ setupUGM()
+ // Queue setup:
+ // root->parent
+ user := security.UserGroup{User: "user1", Groups: []string{"group1"}}
+ conf := createUpdateConfig(user.User, user.Groups[0])
+ manager := GetUserManager()
+ assert.NilError(t, manager.UpdateConfig(conf.Queues[0], "root"))
+
+ expectedResource, err :=
resources.NewResourceFromConf(map[string]string{"memory": "5", "vcores": "5"})
+ if err != nil {
+ t.Errorf("new resource create returned error or wrong resource:
error %t, res %v", err, expectedResource)
+ }
+ assertMaxLimits(t, user, expectedResource, 1)
+
+ for i := 1; i <= 2; i++ {
+ err = manager.IncreaseTrackedResource(queuePath1, TestApp1,
expectedResource, user)
+ if err != nil {
+ t.Fatalf("unable to increase tracked resource:
queuepath %s, app %s, res %v, error %t", queuePath1, TestApp1,
expectedResource, err)
+ }
+ }
+ assert.Error(t, manager.UpdateConfig(conf.Queues[0], "root"), "unable
to set the limit for user user1 because current resource usage is greater than
config max resource for root.parent")
+
+ err = manager.IncreaseTrackedResource(queuePath1, TestApp1,
expectedResource, user)
+ if err != nil {
+ t.Fatalf("unable to increase tracked resource: queuepath %s,
app %s, res %v, error %t", queuePath1, TestApp1, expectedResource, err)
+ }
+ assert.Error(t, manager.UpdateConfig(conf.Queues[0], "root"), "unable
to set the limit for user user1 because current resource usage is greater than
config max resource for root")
+
+ user1 := security.UserGroup{User: "user2", Groups: []string{"group2"}}
+ conf = createUpdateConfigWithWildCardUsersAndGroups(user1.User,
user1.Groups[0])
+ assert.NilError(t, manager.UpdateConfig(conf.Queues[0], "root"))
+}
+
+func createUpdateConfigWithWildCardUsersAndGroups(user string, group string)
configs.PartitionConfig {
+ conf := configs.PartitionConfig{
+ Name: "test",
+ Queues: []configs.QueueConfig{
+ {
+ Name: "root",
+ Parent: true,
+ SubmitACL: "*",
+ Queues: []configs.QueueConfig{
+ {
+ Name: "parent",
+ Parent: true,
+ SubmitACL: "*",
+ Queues: nil,
+ Limits: []configs.Limit{
+ {
+ Limit: "root
queue limit",
+ Users: []string{
+ user,
"*",
+ },
+ Groups:
[]string{
+ group,
"*",
+ },
+ MaxResources:
map[string]string{
+
"memory": "50",
+
"vcores": "50",
+ },
+
MaxApplications: 10,
+ },
+ },
+ },
+ },
+ Limits: []configs.Limit{
+ {
+ Limit: "root queue limit",
+ Users: []string{
+ user, "*",
+ },
+ Groups: []string{
+ group, "*",
+ },
+ MaxResources: map[string]string{
+ "memory": "100",
+ "vcores": "100",
+ },
+ MaxApplications: 20,
+ },
+ },
+ },
+ },
+ }
+ return conf
+}
+
+func createUpdateConfig(user string, group string) configs.PartitionConfig {
+ return createConfig(user, group, "memory", "10")
+}
+
+func createConfig(user string, group string, resourceKey string, resourceValue
string) configs.PartitionConfig {
+ conf := configs.PartitionConfig{
+ Name: "test",
+ Queues: []configs.QueueConfig{
+ {
+ Name: "root",
+ Parent: true,
+ SubmitACL: "*",
+ Queues: []configs.QueueConfig{
+ {
+ Name: "parent",
+ Parent: true,
+ SubmitACL: "*",
+ Queues: nil,
+ Limits: []configs.Limit{
+ {
+ Limit: "root
queue limit",
+ Users: []string{
+ user,
+ },
+ Groups:
[]string{
+ group,
+ },
+ MaxResources:
map[string]string{
+
"memory": "5",
+
"vcores": "5",
+ },
+
MaxApplications: 1,
+ },
+ },
+ },
+ },
+ Limits: []configs.Limit{
+ {
+ Limit: "root queue limit",
+ Users: []string{
+ user,
+ },
+ Groups: []string{
+ group,
+ },
+ MaxResources: map[string]string{
+ resourceKey:
resourceValue,
+ "vcores": "10",
+ },
+ MaxApplications: 2,
+ },
+ },
+ },
+ },
+ }
+ return conf
+}
+
+func TestUpdateConfigClearEarlierSetLimits(t *testing.T) {
+ setupUGM()
+ // Queue setup:
+ // root->parent
+ user := security.UserGroup{User: "user1", Groups: []string{"group1"}}
+ conf := createUpdateConfig(user.User, user.Groups[0])
+
+ manager := GetUserManager()
+ assert.NilError(t, manager.UpdateConfig(conf.Queues[0], "root"))
+
+ expectedResource, err :=
resources.NewResourceFromConf(map[string]string{"memory": "5", "vcores": "5"})
+ if err != nil {
+ t.Errorf("new resource create returned error or wrong resource:
error %t, res %v", err, expectedResource)
+ }
+ assertMaxLimits(t, user, expectedResource, 1)
+
+ user1 := security.UserGroup{User: "user2", Groups: []string{"group2"}}
+ conf = createUpdateConfig(user1.User, user1.Groups[0])
+ assert.NilError(t, manager.UpdateConfig(conf.Queues[0], "root"))
+
+ expectedResource, err =
resources.NewResourceFromConf(map[string]string{"memory": "5", "vcores": "5"})
+ if err != nil {
+ t.Errorf("new resource create returned error or wrong resource:
error %t, res %v", err, expectedResource)
+ }
+ assertMaxLimits(t, user, resources.NewResource(), 0)
+ assertMaxLimits(t, user1, expectedResource, 1)
+}
+
+func TestSetMaxLimitsForRemovedUsers(t *testing.T) {
+ setupUGM()
+ // Queue setup:
+ // root->parent
+ user := security.UserGroup{User: "user1", Groups: []string{"group1"}}
+ conf := createUpdateConfig(user.User, user.Groups[0])
+ manager := GetUserManager()
+ assert.NilError(t, manager.UpdateConfig(conf.Queues[0], "root"))
+
+ expectedResource, err :=
resources.NewResourceFromConf(map[string]string{"memory": "5", "vcores": "5"})
+ if err != nil {
+ t.Errorf("new resource create returned error or wrong resource:
error %t, res %v", err, expectedResource)
+ }
+ assertMaxLimits(t, user, expectedResource, 1)
+
+ for i := 1; i <= 2; i++ {
+ err = manager.IncreaseTrackedResource(queuePath1, TestApp1,
expectedResource, user)
+ if err != nil {
+ t.Fatalf("unable to increase tracked resource:
queuepath %s, app %s, res %v, error %t", queuePath1, TestApp1,
expectedResource, err)
+ }
+ }
+ assert.Equal(t, manager.GetUserTracker(user.User) != nil, true)
+ assert.Equal(t, manager.GetGroupTracker(user.Groups[0]) != nil, true)
+
+ err = manager.DecreaseTrackedResource(queuePath1, TestApp1,
expectedResource, user, false)
+ if err != nil {
+ t.Fatalf("unable to increase tracked resource: queuepath %s,
app %s, res %v, error %t", queuePath1, TestApp1, expectedResource, err)
+ }
+ err = manager.DecreaseTrackedResource(queuePath1, TestApp1,
expectedResource, user, true)
+ if err != nil {
+ t.Fatalf("unable to increase tracked resource: queuepath %s,
app %s, res %v, error %t", queuePath1, TestApp1, expectedResource, err)
+ }
+ assert.Equal(t, manager.GetUserTracker(user.User) == nil, true)
+ assert.Equal(t, manager.GetGroupTracker(user.Groups[0]) == nil, true)
+
+ err = manager.IncreaseTrackedResource(queuePath1, TestApp1,
expectedResource, user)
+ if err != nil {
+ t.Fatalf("unable to increase tracked resource: queuepath %s,
app %s, res %v, error %t", queuePath1, TestApp1, expectedResource, err)
+ }
+ assert.Equal(t, manager.GetUserTracker(user.User) != nil, true)
+ assert.Equal(t, manager.GetGroupTracker(user.Groups[0]) != nil, true)
+ assertMaxLimits(t, user, expectedResource, 1)
+}
+
+func setupUGM() {
+ manager := GetUserManager()
+ manager.ClearUserTrackers()
+ manager.ClearGroupTrackers()
+}
+
func assertUGM(t *testing.T, userGroup security.UserGroup, expected
*resources.Resource, usersCount int) {
manager := GetUserManager()
assert.Equal(t, usersCount, len(manager.GetUsersResources()),
"userTrackers count should be "+strconv.Itoa(usersCount))
@@ -138,3 +460,15 @@ func assertUGM(t *testing.T, userGroup security.UserGroup,
expected *resources.R
groupRes := manager.GetGroupResources(userGroup.Groups[0])
assert.Equal(t, resources.Equals(groupRes, expected), true)
}
+
+func assertMaxLimits(t *testing.T, userGroup security.UserGroup,
expectedResource *resources.Resource, expectedMaxApps int) {
+ manager := GetUserManager()
+ assert.Equal(t,
manager.GetUserTracker(userGroup.User).queueTracker.maxRunningApps,
uint64(expectedMaxApps*2))
+ assert.Equal(t,
manager.GetGroupTracker(userGroup.Groups[0]).queueTracker.maxRunningApps,
uint64(expectedMaxApps*2))
+ assert.Equal(t,
resources.Equals(manager.GetUserTracker(userGroup.User).queueTracker.maxResourceUsage,
resources.Multiply(expectedResource, 2)), true)
+ assert.Equal(t,
resources.Equals(manager.GetGroupTracker(userGroup.Groups[0]).queueTracker.maxResourceUsage,
resources.Multiply(expectedResource, 2)), true)
+ assert.Equal(t,
manager.GetUserTracker(userGroup.User).queueTracker.childQueueTrackers["parent"].maxRunningApps,
uint64(expectedMaxApps))
+ assert.Equal(t,
manager.GetGroupTracker(userGroup.Groups[0]).queueTracker.childQueueTrackers["parent"].maxRunningApps,
uint64(expectedMaxApps))
+ assert.Equal(t,
resources.Equals(manager.GetUserTracker(userGroup.User).queueTracker.childQueueTrackers["parent"].maxResourceUsage,
expectedResource), true)
+ assert.Equal(t,
resources.Equals(manager.GetGroupTracker(userGroup.Groups[0]).queueTracker.childQueueTrackers["parent"].maxResourceUsage,
expectedResource), true)
+}
diff --git a/pkg/scheduler/ugm/queue_tracker.go
b/pkg/scheduler/ugm/queue_tracker.go
index b7ae8140..9113b7be 100644
--- a/pkg/scheduler/ugm/queue_tracker.go
+++ b/pkg/scheduler/ugm/queue_tracker.go
@@ -20,7 +20,6 @@ package ugm
import (
"fmt"
- "strings"
"go.uber.org/zap"
@@ -34,6 +33,8 @@ type QueueTracker struct {
queueName string
resourceUsage *resources.Resource
runningApplications map[string]bool
+ maxResourceUsage *resources.Resource
+ maxRunningApps uint64
childQueueTrackers map[string]*QueueTracker
}
@@ -65,6 +66,13 @@ func (qt *QueueTracker) increaseTrackedResource(queuePath
string, applicationID
qt.resourceUsage.AddTo(usage)
qt.runningApplications[applicationID] = true
+ log.Logger().Debug("Successfully increased resource usage",
+ zap.String("queue path", queuePath),
+ zap.String("application", applicationID),
+ zap.Stringer("resource", usage),
+ zap.Stringer("total resource after increasing",
qt.resourceUsage),
+ zap.Int("total applications after increasing",
len(qt.runningApplications)))
+
childQueuePath, immediateChildQueueName := getChildQueuePath(queuePath)
if childQueuePath != "" {
if qt.childQueueTrackers[immediateChildQueueName] == nil {
@@ -92,6 +100,12 @@ func (qt *QueueTracker) decreaseTrackedResource(queuePath
string, applicationID
if removeApp {
delete(qt.runningApplications, applicationID)
}
+ log.Logger().Debug("Successfully decreased resource usage",
+ zap.String("queue path", queuePath),
+ zap.String("application", applicationID),
+ zap.Stringer("resource", usage),
+ zap.Stringer("total resource after decreasing",
qt.resourceUsage),
+ zap.Int("total applications after decreasing",
len(qt.runningApplications)))
childQueuePath, immediateChildQueueName := getChildQueuePath(queuePath)
if childQueuePath != "" {
@@ -115,6 +129,63 @@ func (qt *QueueTracker) decreaseTrackedResource(queuePath
string, applicationID
return removeQT, nil
}
+func (qt *QueueTracker) getChildQueueTracker(queuePath string) *QueueTracker {
+ var childQueuePath, immediateChildQueueName string
+ childQueuePath, immediateChildQueueName = getChildQueuePath(queuePath)
+ childQueueTracker := qt
+ if childQueuePath != "" {
+ for childQueuePath != "" {
+ if childQueueTracker != nil {
+ if len(childQueueTracker.childQueueTrackers) ==
0 || childQueueTracker.childQueueTrackers[immediateChildQueueName] == nil {
+ newChildQt :=
newQueueTracker(immediateChildQueueName)
+
childQueueTracker.childQueueTrackers[immediateChildQueueName] = newChildQt
+ childQueueTracker = newChildQt
+ } else {
+ childQueueTracker =
childQueueTracker.childQueueTrackers[immediateChildQueueName]
+ }
+ }
+ childQueuePath, immediateChildQueueName =
getChildQueuePath(childQueuePath)
+ }
+ }
+ return childQueueTracker
+}
+
+func (qt *QueueTracker) setMaxApplications(count uint64, queuePath string)
error {
+ log.Logger().Debug("Setting max applications",
+ zap.String("queue path", queuePath),
+ zap.Uint64("max applications", count))
+ childQueueTracker := qt.getChildQueueTracker(queuePath)
+ if childQueueTracker.maxRunningApps != 0 && count != 0 &&
len(childQueueTracker.runningApplications) > int(count) {
+ log.Logger().Warn("Current running applications is greater than
config max applications",
+ zap.String("queue path", queuePath),
+ zap.Uint64("current max applications",
childQueueTracker.maxRunningApps),
+ zap.Int("total running applications",
len(childQueueTracker.runningApplications)),
+ zap.Uint64("config max applications", count))
+ return fmt.Errorf("current running applications is greater than
config max applications for %s", queuePath)
+ } else {
+ childQueueTracker.maxRunningApps = count
+ }
+ return nil
+}
+
+func (qt *QueueTracker) setMaxResources(resource *resources.Resource,
queuePath string) error {
+ log.Logger().Debug("Setting max resources",
+ zap.String("queue path", queuePath),
+ zap.String("max resources", resource.String()))
+ childQueueTracker := qt.getChildQueueTracker(queuePath)
+ if (!resources.Equals(childQueueTracker.maxResourceUsage,
resources.NewResource()) && !resources.Equals(resource,
resources.NewResource())) &&
resources.StrictlyGreaterThan(childQueueTracker.resourceUsage, resource) {
+ log.Logger().Warn("Current resource usage is greater than
config max resource",
+ zap.String("queue path", queuePath),
+ zap.String("current max resource usage",
childQueueTracker.maxResourceUsage.String()),
+ zap.String("total resource usage",
childQueueTracker.resourceUsage.String()),
+ zap.String("config max resources", resource.String()))
+ return fmt.Errorf("current resource usage is greater than
config max resource for %s", queuePath)
+ } else {
+ childQueueTracker.maxResourceUsage = resource
+ }
+ return nil
+}
+
func (qt *QueueTracker) getResourceUsageDAOInfo(parentQueuePath string)
*dao.ResourceUsageDAOInfo {
if qt == nil {
return &dao.ResourceUsageDAOInfo{}
@@ -130,25 +201,11 @@ func (qt *QueueTracker)
getResourceUsageDAOInfo(parentQueuePath string) *dao.Res
for app := range qt.runningApplications {
usage.RunningApplications = append(usage.RunningApplications,
app)
}
+ usage.MaxResources = qt.maxResourceUsage
+ usage.MaxApplications = qt.maxRunningApps
for _, cqt := range qt.childQueueTrackers {
childUsage := cqt.getResourceUsageDAOInfo(fullQueuePath)
usage.Children = append(usage.Children, childUsage)
}
return usage
}
-
-func getChildQueuePath(queuePath string) (string, string) {
- idx := strings.Index(queuePath, configs.DOT)
- childQueuePath := ""
- if idx != -1 {
- childQueuePath = queuePath[idx+1:]
- }
-
- childIndex := strings.Index(childQueuePath, configs.DOT)
- immediateChildQueueName := childQueuePath
- if childIndex != -1 {
- immediateChildQueueName = childQueuePath[:childIndex]
- }
-
- return childQueuePath, immediateChildQueueName
-}
diff --git a/pkg/scheduler/ugm/queue_tracker_test.go
b/pkg/scheduler/ugm/queue_tracker_test.go
index 25e2736e..1a79444e 100644
--- a/pkg/scheduler/ugm/queue_tracker_test.go
+++ b/pkg/scheduler/ugm/queue_tracker_test.go
@@ -184,20 +184,6 @@ func TestQTDecreaseTrackedResource(t *testing.T) {
}
}
-func TestGetChildQueuePath(t *testing.T) {
- childPath, immediateChildName := getChildQueuePath("root.parent.leaf")
- assert.Equal(t, childPath, "parent.leaf")
- assert.Equal(t, immediateChildName, "parent")
-
- childPath, immediateChildName = getChildQueuePath("parent.leaf")
- assert.Equal(t, childPath, "leaf")
- assert.Equal(t, immediateChildName, "leaf")
-
- childPath, immediateChildName = getChildQueuePath("leaf")
- assert.Equal(t, childPath, "")
- assert.Equal(t, immediateChildName, "")
-}
-
func getQTResource(qt *QueueTracker) map[string]*resources.Resource {
resources := make(map[string]*resources.Resource)
usage := qt.getResourceUsageDAOInfo("")
diff --git a/pkg/scheduler/ugm/user_tracker.go
b/pkg/scheduler/ugm/user_tracker.go
index 47455c58..85d508d8 100644
--- a/pkg/scheduler/ugm/user_tracker.go
+++ b/pkg/scheduler/ugm/user_tracker.go
@@ -22,7 +22,6 @@ import (
"sync"
"github.com/apache/yunikorn-core/pkg/common/resources"
- "github.com/apache/yunikorn-core/pkg/common/security"
"github.com/apache/yunikorn-core/pkg/webservice/dao"
)
@@ -39,10 +38,10 @@ type UserTracker struct {
sync.RWMutex
}
-func newUserTracker(user security.UserGroup) *UserTracker {
+func newUserTracker(user string) *UserTracker {
queueTracker := newRootQueueTracker()
userTracker := &UserTracker{
- userName: user.User,
+ userName: user,
appGroupTrackers: make(map[string]*GroupTracker),
queueTracker: queueTracker,
}
@@ -84,6 +83,18 @@ func (ut *UserTracker) getTrackedApplications()
map[string]*GroupTracker {
return ut.appGroupTrackers
}
+func (ut *UserTracker) setMaxApplications(count uint64, queuePath string)
error {
+ ut.Lock()
+ defer ut.Unlock()
+ return ut.queueTracker.setMaxApplications(count, queuePath)
+}
+
+func (ut *UserTracker) setMaxResources(resource *resources.Resource, queuePath
string) error {
+ ut.Lock()
+ defer ut.Unlock()
+ return ut.queueTracker.setMaxResources(resource, queuePath)
+}
+
func (ut *UserTracker) GetUserResourceUsageDAOInfo()
*dao.UserResourceUsageDAOInfo {
ut.RLock()
defer ut.RUnlock()
diff --git a/pkg/scheduler/ugm/user_tracker_test.go
b/pkg/scheduler/ugm/user_tracker_test.go
index 1d97395d..d8e5485d 100644
--- a/pkg/scheduler/ugm/user_tracker_test.go
+++ b/pkg/scheduler/ugm/user_tracker_test.go
@@ -45,7 +45,7 @@ func TestIncreaseTrackedResource(t *testing.T) {
// root->parent->child2
// root->parent->child12 (similar name like above leaf queue, but it is
being treated differently as similar names are allowed)
user := security.UserGroup{User: "test", Groups: []string{"test"}}
- userTracker := newUserTracker(user)
+ userTracker := newUserTracker(user.User)
usage1, err := resources.NewResourceFromConf(map[string]string{"mem":
"10M", "vcore": "10"})
if err != nil {
t.Errorf("new resource create returned error or wrong resource:
error %t, res %v", err, usage1)
@@ -105,7 +105,7 @@ func TestDecreaseTrackedResource(t *testing.T) {
// root->parent->child1
// root->parent->child2
user := security.UserGroup{User: "test", Groups: []string{"test"}}
- userTracker := newUserTracker(user)
+ userTracker := newUserTracker(user.User)
usage1, err := resources.NewResourceFromConf(map[string]string{"mem":
"70M", "vcore": "70"})
if err != nil {
@@ -182,6 +182,52 @@ func TestDecreaseTrackedResource(t *testing.T) {
assert.Equal(t, removeQT, true, "wrong remove queue tracker value")
}
+func TestSetMaxLimits(t *testing.T) {
+ // Queue setup:
+ // root->parent->child1
+ user := security.UserGroup{User: "test", Groups: []string{"test"}}
+ userTracker := newUserTracker(user.User)
+ usage1, err := resources.NewResourceFromConf(map[string]string{"mem":
"10M", "vcore": "10"})
+ if err != nil {
+ t.Errorf("new resource create returned error or wrong resource:
error %t, res %v", err, usage1)
+ }
+ err = userTracker.increaseTrackedResource(queuePath1, TestApp1, usage1)
+ if err != nil {
+ t.Fatalf("unable to increase tracked resource: queuepath %s,
app %s, res %v, error %t", queuePath1, TestApp1, usage1, err)
+ }
+ groupTracker := newGroupTracker(user.User)
+ userTracker.setGroupForApp(TestApp1, groupTracker)
+
+ setMaxAppsErr := userTracker.setMaxApplications(1, queuePath1)
+ assert.NilError(t, setMaxAppsErr)
+
+ setMaxResourcesErr := userTracker.setMaxResources(usage1, queuePath1)
+ assert.NilError(t, setMaxResourcesErr)
+
+ setParentMaxAppsErr := userTracker.setMaxApplications(1, "root.parent")
+ assert.NilError(t, setParentMaxAppsErr)
+
+ setParentMaxResourcesErr := userTracker.setMaxResources(usage1,
"root.parent")
+ assert.NilError(t, setParentMaxResourcesErr)
+
+ err = userTracker.increaseTrackedResource(queuePath1, TestApp2, usage1)
+ if err != nil {
+ t.Fatalf("unable to increase tracked resource: queuepath %s,
app %s, res %v, error %t", queuePath1, TestApp1, usage1, err)
+ }
+
+ setMaxAppsErr1 := userTracker.setMaxApplications(1, queuePath1)
+ assert.Error(t, setMaxAppsErr1, "current running applications is
greater than config max applications for "+queuePath1)
+
+ setMaxResourcesErr1 := userTracker.setMaxResources(usage1, queuePath1)
+ assert.Error(t, setMaxResourcesErr1, "current resource usage is greater
than config max resource for "+queuePath1)
+
+ setParentMaxAppsErr1 := userTracker.setMaxApplications(1, "root.parent")
+ assert.Error(t, setParentMaxAppsErr1, "current running applications is
greater than config max applications for root.parent")
+
+ setParentMaxResourcesErr1 := userTracker.setMaxResources(usage1,
"root.parent")
+ assert.Error(t, setParentMaxResourcesErr1, "current resource usage is
greater than config max resource for root.parent")
+}
+
func getUserResource(ut *UserTracker) map[string]*resources.Resource {
resources := make(map[string]*resources.Resource)
usage := ut.GetUserResourceUsageDAOInfo()
diff --git a/pkg/scheduler/ugm/utilities_test.go
b/pkg/scheduler/ugm/utilities.go
similarity index 64%
copy from pkg/scheduler/ugm/utilities_test.go
copy to pkg/scheduler/ugm/utilities.go
index 9952e985..3b7795e4 100644
--- a/pkg/scheduler/ugm/utilities_test.go
+++ b/pkg/scheduler/ugm/utilities.go
@@ -19,16 +19,20 @@
package ugm
import (
- "github.com/apache/yunikorn-core/pkg/common/resources"
- "github.com/apache/yunikorn-core/pkg/webservice/dao"
+ "strings"
+
+ "github.com/apache/yunikorn-core/pkg/common/configs"
)
-func internalGetResource(usage *dao.ResourceUsageDAOInfo, resources
map[string]*resources.Resource) map[string]*resources.Resource {
- resources[usage.QueuePath] = usage.ResourceUsage
- if len(usage.Children) > 0 {
- for _, resourceUsage := range usage.Children {
- internalGetResource(resourceUsage, resources)
- }
+func getChildQueuePath(queuePath string) (string, string) {
+ idx := strings.Index(queuePath, configs.DOT)
+ if idx == -1 {
+ return "", ""
+ }
+ childQueuePath := queuePath[idx+1:]
+ idx = strings.Index(childQueuePath, configs.DOT)
+ if idx == -1 {
+ return childQueuePath, childQueuePath
}
- return resources
+ return childQueuePath, childQueuePath[:idx]
}
diff --git a/pkg/scheduler/ugm/utilities_test.go
b/pkg/scheduler/ugm/utilities_test.go
index 9952e985..8e38ae1f 100644
--- a/pkg/scheduler/ugm/utilities_test.go
+++ b/pkg/scheduler/ugm/utilities_test.go
@@ -19,6 +19,10 @@
package ugm
import (
+ "testing"
+
+ "gotest.tools/v3/assert"
+
"github.com/apache/yunikorn-core/pkg/common/resources"
"github.com/apache/yunikorn-core/pkg/webservice/dao"
)
@@ -32,3 +36,17 @@ func internalGetResource(usage *dao.ResourceUsageDAOInfo,
resources map[string]*
}
return resources
}
+
+func TestGetChildQueuePath(t *testing.T) {
+ childPath, immediateChildName := getChildQueuePath("root.parent.leaf")
+ assert.Equal(t, childPath, "parent.leaf")
+ assert.Equal(t, immediateChildName, "parent")
+
+ childPath, immediateChildName = getChildQueuePath("parent.leaf")
+ assert.Equal(t, childPath, "leaf")
+ assert.Equal(t, immediateChildName, "leaf")
+
+ childPath, immediateChildName = getChildQueuePath("leaf")
+ assert.Equal(t, childPath, "")
+ assert.Equal(t, immediateChildName, "")
+}
diff --git a/pkg/scheduler/utilities_test.go b/pkg/scheduler/utilities_test.go
index 13e7f763..99daf94e 100644
--- a/pkg/scheduler/utilities_test.go
+++ b/pkg/scheduler/utilities_test.go
@@ -19,6 +19,7 @@
package scheduler
import (
+ "strconv"
"testing"
"gotest.tools/v3/assert"
@@ -29,25 +30,28 @@ import (
"github.com/apache/yunikorn-core/pkg/rmproxy"
"github.com/apache/yunikorn-core/pkg/scheduler/objects"
"github.com/apache/yunikorn-core/pkg/scheduler/ugm"
+ "github.com/apache/yunikorn-core/pkg/webservice/dao"
"github.com/apache/yunikorn-scheduler-interface/lib/go/common"
"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
)
const (
- appID1 = "app-1"
- appID2 = "app-2"
- appID3 = "app-3"
- nodeID1 = "node-1"
- instType1 = "itype-1"
- nodeID2 = "node-2"
- instType2 = "itype-2"
- defQueue = "root.default"
- rmID = "testRM"
- taskGroup = "tg-1"
- phID = "ph-1"
- allocID = "alloc-1"
- allocID2 = "alloc-2"
- allocID3 = "alloc-3"
+ appID1 = "app-1"
+ appID2 = "app-2"
+ appID3 = "app-3"
+ nodeID1 = "node-1"
+ instType1 = "itype-1"
+ nodeID2 = "node-2"
+ instType2 = "itype-2"
+ defQueue = "root.default"
+ rmID = "testRM"
+ taskGroup = "tg-1"
+ phID = "ph-1"
+ allocID = "alloc-1"
+ allocID2 = "alloc-2"
+ allocID3 = "alloc-3"
+ maxresources = "maxresources"
+ maxapplications = "maxapplications"
)
func newBasePartition() (*PartitionContext, error) {
@@ -63,6 +67,38 @@ func newBasePartition() (*PartitionContext, error) {
Name: "default",
Parent: false,
Queues: nil,
+ Limits: []configs.Limit{
+ {
+ Limit: "default
queue limit",
+ Users: []string{
+
"testuser",
+ },
+ Groups:
[]string{
+
"testgroup",
+ },
+ MaxResources:
map[string]string{
+
"memory": "5",
+
"vcores": "5",
+ },
+
MaxApplications: 1,
+ },
+ },
+ },
+ },
+ Limits: []configs.Limit{
+ {
+ Limit: "root queue limit",
+ Users: []string{
+ "testuser",
+ },
+ Groups: []string{
+ "testgroup",
+ },
+ MaxResources: map[string]string{
+ "memory": "10",
+ "vcores": "10",
+ },
+ MaxApplications: 2,
},
},
},
@@ -88,6 +124,22 @@ func newConfiguredPartition() (*PartitionContext, error) {
Name: "leaf",
Parent: false,
Queues: nil,
+ Limits: []configs.Limit{
+ {
+ Limit: "leaf
queue limit",
+ Users: []string{
+
"testuser",
+ },
+ Groups:
[]string{
+
"testgroup",
+ },
+ MaxResources:
map[string]string{
+
"memory": "5",
+
"vcores": "5",
+ },
+
MaxApplications: 1,
+ },
+ },
}, {
Name: "parent",
Parent: true,
@@ -96,10 +148,58 @@ func newConfiguredPartition() (*PartitionContext, error) {
Name:
"sub-leaf",
Parent: false,
Queues: nil,
+ Limits:
[]configs.Limit{
+ {
+
Limit: "sub-leaf queue limit",
+
Users: []string{
+
"testuser",
+
},
+
Groups: []string{
+
"testgroup",
+
},
+
MaxResources: map[string]string{
+
"memory": "3",
+
"vcores": "3",
+
},
+
MaxApplications: 2,
+ },
+ },
+ },
+ },
+ Limits: []configs.Limit{
+ {
+ Limit: "parent
queue limit",
+ Users: []string{
+
"testuser",
+ },
+ Groups:
[]string{
+
"testgroup",
+ },
+ MaxResources:
map[string]string{
+
"memory": "5",
+
"vcores": "5",
+ },
+
MaxApplications: 2,
},
},
},
},
+ Limits: []configs.Limit{
+ {
+ Limit: "root queue limit",
+ Users: []string{
+ "testuser",
+ },
+ Groups: []string{
+ "testgroup",
+ },
+ MaxResources: map[string]string{
+ "memory": "10",
+ "vcores": "10",
+ },
+ MaxApplications: 2,
+ },
+ },
},
},
PlacementRules: nil,
@@ -133,6 +233,22 @@ func newPreemptionConfiguredPartition(parentLimit
map[string]string, leafGuarant
Guaranteed: leafGuarantees,
},
Properties:
map[string]string{configs.PreemptionDelay: "1ms"},
+ Limits:
[]configs.Limit{
+ {
+
Limit: "leaf1 queue limit",
+
Users: []string{
+
"testuser",
+
},
+
Groups: []string{
+
"testgroup",
+
},
+
MaxResources: map[string]string{
+
"memory": "5",
+
"vcores": "5",
+
},
+
MaxApplications: 1,
+ },
+ },
},
{
Name: "leaf2",
@@ -142,14 +258,61 @@ func newPreemptionConfiguredPartition(parentLimit
map[string]string, leafGuarant
Guaranteed: leafGuarantees,
},
Properties:
map[string]string{configs.PreemptionDelay: "1ms"},
+ Limits:
[]configs.Limit{
+ {
+
Limit: "leaf2 queue limit",
+
Users: []string{
+
"testuser",
+
},
+
Groups: []string{
+
"testgroup",
+
},
+
MaxResources: map[string]string{
+
"memory": "5",
+
"vcores": "5",
+
},
+
MaxApplications: 1,
+ },
+ },
},
},
+ Limits: []configs.Limit{
+ {
+ Limit: "parent
queue limit",
+ Users: []string{
+
"testuser",
+ },
+ Groups:
[]string{
+
"testgroup",
+ },
+ MaxResources:
map[string]string{
+
"memory": "5",
+
"vcores": "5",
+ },
+
MaxApplications: 2,
+ },
+ },
+ },
+ },
+ Limits: []configs.Limit{
+ {
+ Limit: "root queue limit",
+ Users: []string{
+ "testuser",
+ },
+ Groups: []string{
+ "testgroup",
+ },
+ MaxResources: map[string]string{
+ "memory": "10",
+ "vcores": "10",
+ },
+ MaxApplications: 2,
},
},
},
},
PlacementRules: nil,
- Limits: nil,
NodeSortPolicy: configs.NodeSortingPolicy{},
}
return newPartitionContext(conf, rmID, nil)
@@ -171,6 +334,38 @@ func newLimitedPartition(resLimit map[string]string)
(*PartitionContext, error)
Resources: configs.Resources{
Max: resLimit,
},
+ Limits: []configs.Limit{
+ {
+ Limit: "limited
queue limit",
+ Users: []string{
+
"testuser",
+ },
+ Groups:
[]string{
+
"testgroup",
+ },
+ MaxResources:
map[string]string{
+
"memory": "5",
+
"vcores": "5",
+ },
+
MaxApplications: 2,
+ },
+ },
+ },
+ },
+ Limits: []configs.Limit{
+ {
+ Limit: "root queue limit",
+ Users: []string{
+ "testuser",
+ },
+ Groups: []string{
+ "testgroup",
+ },
+ MaxResources: map[string]string{
+ "memory": "10",
+ "vcores": "10",
+ },
+ MaxApplications: 2,
},
},
},
@@ -192,6 +387,22 @@ func newPlacementPartition() (*PartitionContext, error) {
Parent: true,
SubmitACL: "*",
Queues: nil,
+ Limits: []configs.Limit{
+ {
+ Limit: "root queue limit",
+ Users: []string{
+ "testuser",
+ },
+ Groups: []string{
+ "testgroup",
+ },
+ MaxResources: map[string]string{
+ "memory": "10",
+ "vcores": "10",
+ },
+ MaxApplications: 2,
+ },
+ },
},
},
PlacementRules: []configs.PlacementRule{
@@ -375,10 +586,78 @@ func getTestUserGroup() security.UserGroup {
return security.UserGroup{User: "testuser", Groups:
[]string{"testgroup"}}
}
-func assertUserGroupResource(t *testing.T, userGroup security.UserGroup,
expected *resources.Resource) {
- ugm := ugm.GetUserManager()
- userResource := ugm.GetUserResources(userGroup)
- groupResource := ugm.GetGroupResources(userGroup.Groups[0])
+func assertLimits(t *testing.T, userGroup security.UserGroup, expected
*resources.Resource) {
+ expectedQueuesMaxLimits := make(map[string]map[string]interface{})
+ expectedQueuesMaxLimits["root"] = make(map[string]interface{})
+ expectedQueuesMaxLimits["root.default"] = make(map[string]interface{})
+ expectedQueuesMaxLimits["root"][maxresources] =
resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 10,
"vcores": 10})
+ expectedQueuesMaxLimits["root.default"][maxresources] =
resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 5,
"vcores": 5})
+ expectedQueuesMaxLimits["root"][maxapplications] = uint64(2)
+ expectedQueuesMaxLimits["root.default"][maxapplications] = uint64(1)
+ assertUserGroupResourceMaxLimits(t, userGroup, expected,
expectedQueuesMaxLimits)
+}
+
+func assertUserGroupResourceMaxLimits(t *testing.T, userGroup
security.UserGroup, expected *resources.Resource, expectedQueuesMaxLimits
map[string]map[string]interface{}) {
+ manager := ugm.GetUserManager()
+ userResource := manager.GetUserResources(userGroup)
+ groupResource := manager.GetGroupResources(userGroup.Groups[0])
+ ut := manager.GetUserTracker(userGroup.User)
+ if ut != nil {
+ maxResources := make(map[string]*resources.Resource)
+ usage := ut.GetUserResourceUsageDAOInfo()
+ getMaxResource(usage.Queues, maxResources)
+ for q, qMaxLimits := range expectedQueuesMaxLimits {
+ if qRes, ok := maxResources[q]; ok {
+ assert.Equal(t, resources.Equals(qRes,
qMaxLimits[maxresources].(*resources.Resource)), true)
+ }
+ }
+ maxApplications := make(map[string]uint64)
+ getMaxApplications(usage.Queues, maxApplications)
+ for q, qMaxLimits := range expectedQueuesMaxLimits {
+ if qApps, ok := maxApplications[q]; ok {
+ assert.Equal(t, qApps,
qMaxLimits[maxapplications].(uint64), "queue path is "+q+" actual:
"+strconv.Itoa(int(qApps))+", expected:
"+strconv.Itoa(int(qMaxLimits[maxapplications].(uint64))))
+ }
+ }
+ }
+
+ gt := manager.GetUserTracker(userGroup.User)
+ if gt != nil {
+ gMaxResources := make(map[string]*resources.Resource)
+ gUsage := gt.GetUserResourceUsageDAOInfo()
+ getMaxResource(gUsage.Queues, gMaxResources)
+ for q, qMaxLimits := range expectedQueuesMaxLimits {
+ if qRes, ok := gMaxResources[q]; ok {
+ assert.Equal(t, resources.Equals(qRes,
qMaxLimits[maxresources].(*resources.Resource)), true)
+ }
+ }
+ gMaxApps := make(map[string]uint64)
+ getMaxApplications(gUsage.Queues, gMaxApps)
+ for q, qMaxLimits := range expectedQueuesMaxLimits {
+ if qApps, ok := gMaxApps[q]; ok {
+ assert.Equal(t, qApps,
qMaxLimits[maxapplications].(uint64))
+ }
+ }
+ }
assert.Equal(t, resources.Equals(userResource, expected), true)
assert.Equal(t, resources.Equals(groupResource, expected), true)
}
+
+func getMaxResource(usage *dao.ResourceUsageDAOInfo, maxResources
map[string]*resources.Resource) map[string]*resources.Resource {
+ maxResources[usage.QueuePath] = usage.MaxResources
+ if len(usage.Children) > 0 {
+ for _, resourceUsage := range usage.Children {
+ getMaxResource(resourceUsage, maxResources)
+ }
+ }
+ return maxResources
+}
+
+func getMaxApplications(usage *dao.ResourceUsageDAOInfo, maxApplications
map[string]uint64) map[string]uint64 {
+ maxApplications[usage.QueuePath] = usage.MaxApplications
+ if len(usage.Children) > 0 {
+ for _, resourceUsage := range usage.Children {
+ getMaxApplications(resourceUsage, maxApplications)
+ }
+ }
+ return maxApplications
+}
diff --git a/pkg/webservice/dao/ugm_info.go b/pkg/webservice/dao/ugm_info.go
index 59d7dbe0..9d7dfbd2 100644
--- a/pkg/webservice/dao/ugm_info.go
+++ b/pkg/webservice/dao/ugm_info.go
@@ -36,5 +36,7 @@ type ResourceUsageDAOInfo struct {
QueuePath string `json:"queuePath"`
ResourceUsage *resources.Resource `json:"resourceUsage"`
RunningApplications []string `json:"runningApplications"`
+ MaxResources *resources.Resource `json:"maxResources"`
+ MaxApplications uint64 `json:"maxApplications"`
Children []*ResourceUsageDAOInfo `json:"children"`
}
diff --git a/pkg/webservice/handlers.go b/pkg/webservice/handlers.go
index 5bae339d..2076dccb 100644
--- a/pkg/webservice/handlers.go
+++ b/pkg/webservice/handlers.go
@@ -30,7 +30,6 @@ import (
"strings"
"github.com/julienschmidt/httprouter"
-
"github.com/prometheus/client_golang/prometheus/promhttp"
"go.uber.org/zap"
"gopkg.in/yaml.v2"
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]