This is an automated email from the ASF dual-hosted git repository.
pbacsko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git
The following commit(s) were added to refs/heads/master by this push:
new 9c601bd6 [YUNIKORN-2777] Improve TrackedResource type (#928)
9c601bd6 is described below
commit 9c601bd611155bb905e57b2eaaca4f8dc559c41e
Author: Peter Bacsko <[email protected]>
AuthorDate: Tue Aug 13 14:16:04 2024 +0200
[YUNIKORN-2777] Improve TrackedResource type (#928)
Closes: #928
Signed-off-by: Peter Bacsko <[email protected]>
---
pkg/common/resources/tracked_resources.go | 43 +++++---------
pkg/common/resources/tracked_resources_test.go | 82 +++++++++++++-------------
pkg/scheduler/objects/application_test.go | 10 ++--
pkg/scheduler/partition_test.go | 11 ++--
4 files changed, 67 insertions(+), 79 deletions(-)
diff --git a/pkg/common/resources/tracked_resources.go
b/pkg/common/resources/tracked_resources.go
index 09e8a022..6b614384 100644
--- a/pkg/common/resources/tracked_resources.go
+++ b/pkg/common/resources/tracked_resources.go
@@ -31,23 +31,28 @@ type TrackedResource struct {
// TrackedResourceMap is a two-level map for aggregated resource usage.
// The top-level key is the instance type, and the value is a map:
// resource type (CPU, memory, etc.) -> aggregated used time (in
seconds) of the resource type.
- TrackedResourceMap map[string]map[string]int64
+ TrackedResourceMap map[string]*Resource
locking.RWMutex
}
// NewTrackedResource creates a new instance of TrackedResource.
func NewTrackedResource() *TrackedResource {
- return &TrackedResource{TrackedResourceMap:
make(map[string]map[string]int64)}
+ return &TrackedResource{TrackedResourceMap: make(map[string]*Resource)}
}
// NewTrackedResourceFromMap creates NewTrackedResource from the given map.
// Using for Testing purpose only.
-func NewTrackedResourceFromMap(m map[string]map[string]int64) *TrackedResource
{
+func NewTrackedResourceFromMap(m map[string]map[string]Quantity)
*TrackedResource {
if m == nil {
return NewTrackedResource()
}
- return &TrackedResource{TrackedResourceMap: m}
+
+ trackedMap := make(map[string]*Resource)
+ for inst, inner := range m {
+ trackedMap[inst] = NewResourceFromMap(inner)
+ }
+ return &TrackedResource{TrackedResourceMap: trackedMap}
}
func (tr *TrackedResource) String() string {
@@ -56,7 +61,7 @@ func (tr *TrackedResource) String() string {
var resourceUsage []string
for instanceType, resourceTypeMap := range tr.TrackedResourceMap {
- for resourceType, usageTime := range resourceTypeMap {
+ for resourceType, usageTime := range resourceTypeMap.Resources {
resourceUsage = append(resourceUsage,
fmt.Sprintf("%s:%s=%d", instanceType, resourceType, usageTime))
}
}
@@ -73,11 +78,7 @@ func (tr *TrackedResource) Clone() *TrackedResource {
tr.RLock()
defer tr.RUnlock()
for k, v := range tr.TrackedResourceMap {
- dest := make(map[string]int64)
- for key, element := range v {
- dest[key] = element
- }
- ret.TrackedResourceMap[k] = dest
+ ret.TrackedResourceMap[k] = v.Clone()
}
return ret
}
@@ -96,10 +97,10 @@ func (tr *TrackedResource)
AggregateTrackedResource(instType string,
timeDiff := int64(releaseTime.Sub(bindTime).Seconds())
aggregatedResourceTime, ok := tr.TrackedResourceMap[instType]
if !ok {
- aggregatedResourceTime = map[string]int64{}
+ aggregatedResourceTime = NewResource()
}
for key, element := range resource.Resources {
- aggregatedResourceTime[key] += int64(element) * timeDiff
+ aggregatedResourceTime.Resources[key] += element *
Quantity(timeDiff)
}
tr.TrackedResourceMap[instType] = aggregatedResourceTime
}
@@ -119,23 +120,7 @@ func EqualsTracked(left, right *TrackedResource) bool {
return false
}
- if !equalsMapContents(v, inner) {
- return false
- }
- }
-
- return true
-}
-
-func equalsMapContents(left, right map[string]int64) bool {
- for k, v := range left {
- if right[k] != v {
- return false
- }
- }
-
- for k, v := range right {
- if left[k] != v {
+ if !Equals(v, inner) {
return false
}
}
diff --git a/pkg/common/resources/tracked_resources_test.go
b/pkg/common/resources/tracked_resources_test.go
index 5ca00832..2d420053 100644
--- a/pkg/common/resources/tracked_resources_test.go
+++ b/pkg/common/resources/tracked_resources_test.go
@@ -37,12 +37,12 @@ func CheckLenOfTrackedResource(res *TrackedResource,
expected int) (bool, string
return true, ""
}
-func CheckResourceValueOfTrackedResource(res *TrackedResource, expected
map[string]map[string]int64) (bool, string) {
- for instanceType, resources := range expected {
- for key, value := range resources {
- if got := res.TrackedResourceMap[instanceType][key];
got != value {
- return false, fmt.Sprintf("instance type %s,
resource %s, expected %d, got %d", instanceType, key, value, got)
- }
+func CheckResourceValueOfTrackedResource(res *TrackedResource, expected
map[string]map[string]Quantity) (bool, string) {
+ for instanceType, expected := range expected {
+ trackedRes := res.TrackedResourceMap[instanceType]
+ expectedRes := NewResourceFromMap(expected)
+ if !Equals(trackedRes, expectedRes) {
+ return false, fmt.Sprintf("instance type %s, expected
%s, got %s", instanceType, trackedRes, expectedRes)
}
}
return true, ""
@@ -51,37 +51,37 @@ func CheckResourceValueOfTrackedResource(res
*TrackedResource, expected map[stri
func TestNewTrackedResourceFromMap(t *testing.T) {
type outputs struct {
length int
- trackedResources map[string]map[string]int64
+ trackedResources map[string]map[string]Quantity
}
var tests = []struct {
caseName string
- input map[string]map[string]int64
+ input map[string]map[string]Quantity
expected outputs
}{
{
"nil",
nil,
- outputs{0, map[string]map[string]int64{}},
+ outputs{0, map[string]map[string]Quantity{}},
},
{
"empty",
- map[string]map[string]int64{},
- outputs{0, map[string]map[string]int64{}},
+ map[string]map[string]Quantity{},
+ outputs{0, map[string]map[string]Quantity{}},
},
{
"tracked resources of one instance type",
- map[string]map[string]int64{"instanceType1": {"first":
1}},
- outputs{1, map[string]map[string]int64{"instanceType1":
{"first": 1}}},
+ map[string]map[string]Quantity{"instanceType1":
{"first": 1}},
+ outputs{1,
map[string]map[string]Quantity{"instanceType1": {"first": 1}}},
},
{
"tracked resources of two instance type",
- map[string]map[string]int64{"instanceType1": {"first":
0}, "instanceType2": {"second": -1}},
- outputs{2, map[string]map[string]int64{"instanceType1":
{"first": 0}, "instanceType2": {"second": -1}}},
+ map[string]map[string]Quantity{"instanceType1":
{"first": 0}, "instanceType2": {"second": -1}},
+ outputs{2,
map[string]map[string]Quantity{"instanceType1": {"first": 0}, "instanceType2":
{"second": -1}}},
},
{
"Multiple tracked resources for one instance type",
- map[string]map[string]int64{"instanceType1": {"first":
1, "second": -2, "third": 3}},
- outputs{1, map[string]map[string]int64{"instanceType1":
{"first": 1, "second": -2, "third": 3}}},
+ map[string]map[string]Quantity{"instanceType1":
{"first": 1, "second": -2, "third": 3}},
+ outputs{1,
map[string]map[string]Quantity{"instanceType1": {"first": 1, "second": -2,
"third": 3}}},
},
}
for _, tt := range tests {
@@ -101,7 +101,7 @@ func TestNewTrackedResourceFromMap(t *testing.T) {
func TestTrackedResourceClone(t *testing.T) {
var tests = []struct {
caseName string
- input map[string]map[string]int64
+ input map[string]map[string]Quantity
}{
{
"Nil check",
@@ -109,11 +109,11 @@ func TestTrackedResourceClone(t *testing.T) {
},
{
"No Resources in TrackedResources",
- map[string]map[string]int64{},
+ map[string]map[string]Quantity{},
},
{
"Proper TrackedResource mappings",
- map[string]map[string]int64{"instanceType1": {"first":
1, "second": -2}, "instanceType2": {"second": -2, "third": 3}},
+ map[string]map[string]Quantity{"instanceType1":
{"first": 1, "second": -2}, "instanceType2": {"second": -2, "third": 3}},
},
}
for _, tt := range tests {
@@ -138,7 +138,7 @@ func TestTrackedResourceClone(t *testing.T) {
// The likelihood of a test failure due to this should be extremely low.
func TestTrackedResourceAggregateTrackedResource(t *testing.T) {
type inputs struct {
- trackedResource map[string]map[string]int64
+ trackedResource map[string]map[string]Quantity
instType string
otherResource map[string]Quantity
bindTime time.Time
@@ -147,57 +147,57 @@ func TestTrackedResourceAggregateTrackedResource(t
*testing.T) {
var tests = []struct {
caseName string
input inputs
- expectedTrackedResource map[string]map[string]int64
+ expectedTrackedResource map[string]map[string]Quantity
}{
{
"Resource to be aggregated Nil Check",
inputs{
- map[string]map[string]int64{"instanceType1":
{"first": 1, "second": -2}},
+ map[string]map[string]Quantity{"instanceType1":
{"first": 1, "second": -2}},
"instanceType1",
nil,
time.Now().Add(-time.Minute),
},
- map[string]map[string]int64{"instanceType1": {"first":
1, "second": -2}},
+ map[string]map[string]Quantity{"instanceType1":
{"first": 1, "second": -2}},
},
{
"Resource to be aggregated is Empty",
inputs{
- map[string]map[string]int64{"instanceType1":
{"first": 1, "second": -2}},
+ map[string]map[string]Quantity{"instanceType1":
{"first": 1, "second": -2}},
"instanceType1",
map[string]Quantity{},
time.Now().Add(-time.Minute),
},
- map[string]map[string]int64{"instanceType1": {"first":
1, "second": -2}},
+ map[string]map[string]Quantity{"instanceType1":
{"first": 1, "second": -2}},
},
{
"TrackedResource is empty",
inputs{
- map[string]map[string]int64{},
+ map[string]map[string]Quantity{},
"instanceType1",
map[string]Quantity{"first": 1, "second": 2},
time.Now().Add(-time.Minute),
},
- map[string]map[string]int64{"instanceType1": {"first":
60, "second": 120}},
+ map[string]map[string]Quantity{"instanceType1":
{"first": 60, "second": 120}},
},
{
"With Negative Values Involved",
inputs{
- map[string]map[string]int64{"instanceType1":
{"first": 1, "second": -2}},
+ map[string]map[string]Quantity{"instanceType1":
{"first": 1, "second": -2}},
"instanceType1",
map[string]Quantity{"first": 1, "second": 2},
time.Now().Add(-time.Minute),
},
- map[string]map[string]int64{"instanceType1": {"first":
61, "second": 118}},
+ map[string]map[string]Quantity{"instanceType1":
{"first": 61, "second": 118}},
},
{
"Multiple Instance Types",
inputs{
- map[string]map[string]int64{"instanceType1":
{"first": 1, "second": 2}, "instanceType2": {"third": 3, "four": 4}},
+ map[string]map[string]Quantity{"instanceType1":
{"first": 1, "second": 2}, "instanceType2": {"third": 3, "four": 4}},
"instanceType2",
map[string]Quantity{"first": 1, "second": 2},
time.Now().Add(-time.Minute),
},
- map[string]map[string]int64{"instanceType1": {"first":
1, "second": 2}, "instanceType2": {"first": 60, "second": 120, "third": 3,
"four": 4}},
+ map[string]map[string]Quantity{"instanceType1":
{"first": 1, "second": 2}, "instanceType2": {"first": 60, "second": 120,
"third": 3, "four": 4}},
},
}
for _, tt := range tests {
@@ -215,8 +215,8 @@ func TestTrackedResourceAggregateTrackedResource(t
*testing.T) {
func TestEqualsTracked(t *testing.T) {
type inputs struct {
- base map[string]map[string]int64
- compare map[string]map[string]int64
+ base map[string]map[string]Quantity
+ compare map[string]map[string]Quantity
}
var tests = []struct {
caseName string
@@ -224,25 +224,25 @@ func TestEqualsTracked(t *testing.T) {
expected bool
}{
{"simple cases (nil checks)", inputs{nil, nil}, true},
- {"simple cases (nil checks)",
inputs{map[string]map[string]int64{}, nil}, false},
+ {"simple cases (nil checks)",
inputs{map[string]map[string]Quantity{}, nil}, false},
{"same first and second level keys and different resource
value",
- inputs{map[string]map[string]int64{"first": {"val":
10}}, map[string]map[string]int64{"first": {"val": 0}}},
+ inputs{map[string]map[string]Quantity{"first": {"val":
10}}, map[string]map[string]Quantity{"first": {"val": 0}}},
false,
},
{"different first-level key, same second-level key, same
resource value",
- inputs{map[string]map[string]int64{"first": {"val":
10}}, map[string]map[string]int64{"second": {"val": 10}}},
+ inputs{map[string]map[string]Quantity{"first": {"val":
10}}, map[string]map[string]Quantity{"second": {"val": 10}}},
false},
{"same first-level key, different second-level key, same
resource value",
- inputs{map[string]map[string]int64{"first": {"val":
10}}, map[string]map[string]int64{"first": {"value": 10}}},
+ inputs{map[string]map[string]Quantity{"first": {"val":
10}}, map[string]map[string]Quantity{"first": {"value": 10}}},
false},
{"same first-level key, second has larger sub-level map",
- inputs{map[string]map[string]int64{"first": {"val":
10}}, map[string]map[string]int64{"first": {"val": 10, "sum": 7}}},
+ inputs{map[string]map[string]Quantity{"first": {"val":
10}}, map[string]map[string]Quantity{"first": {"val": 10, "sum": 7}}},
false},
{"same first-level key, first has larger sub-level map",
- inputs{map[string]map[string]int64{"first": {"val": 10,
"sum": 7}}, map[string]map[string]int64{"first": {"val": 10}}},
+ inputs{map[string]map[string]Quantity{"first": {"val":
10, "sum": 7}}, map[string]map[string]Quantity{"first": {"val": 10}}},
false},
{"same keys and values",
- inputs{map[string]map[string]int64{"x": {"val": 10,
"sum": 7}}, map[string]map[string]int64{"x": {"val": 10, "sum": 7}}},
+ inputs{map[string]map[string]Quantity{"x": {"val": 10,
"sum": 7}}, map[string]map[string]Quantity{"x": {"val": 10, "sum": 7}}},
true},
}
for _, tt := range tests {
diff --git a/pkg/scheduler/objects/application_test.go
b/pkg/scheduler/objects/application_test.go
index a58db862..3d2008f4 100644
--- a/pkg/scheduler/objects/application_test.go
+++ b/pkg/scheduler/objects/application_test.go
@@ -1054,15 +1054,17 @@ func TestCompleted(t *testing.T) {
func assertResourceUsage(t *testing.T, appSummary *ApplicationSummary,
memorySeconds int64, vcoresSecconds int64) {
detailedResource :=
appSummary.ResourceUsage.TrackedResourceMap[instType1]
- assert.Equal(t, memorySeconds, detailedResource["memory"])
- assert.Equal(t, vcoresSecconds, detailedResource["vcores"])
+ if detailedResource != nil {
+ assert.Equal(t, memorySeconds,
int64(detailedResource.Resources["memory"]))
+ assert.Equal(t, vcoresSecconds,
int64(detailedResource.Resources["vcores"]))
+ }
}
func assertPlaceHolderResource(t *testing.T, appSummary *ApplicationSummary,
memorySeconds int64,
vcoresSecconds int64) {
detailedResource :=
appSummary.PlaceholderResource.TrackedResourceMap[instType1]
- assert.Equal(t, memorySeconds, detailedResource["memory"])
- assert.Equal(t, vcoresSecconds, detailedResource["vcores"])
+ assert.Equal(t, memorySeconds,
int64(detailedResource.Resources["memory"]))
+ assert.Equal(t, vcoresSecconds,
int64(detailedResource.Resources["vcores"]))
}
func TestResourceUsageAggregation(t *testing.T) {
diff --git a/pkg/scheduler/partition_test.go b/pkg/scheduler/partition_test.go
index d1357dc8..c6db6280 100644
--- a/pkg/scheduler/partition_test.go
+++ b/pkg/scheduler/partition_test.go
@@ -1883,17 +1883,18 @@ func TestRequiredNodeAllocation(t *testing.T) {
func assertPreemptedResource(t *testing.T, appSummary
*objects.ApplicationSummary, memorySeconds int64,
vcoresSecconds int64) {
detailedResource :=
appSummary.PreemptedResource.TrackedResourceMap["UNKNOWN"]
- memValue, memPresent := detailedResource["memory"]
- vcoreValue, vcorePresent := detailedResource["vcore"]
+
+ memValue, memPresent := detailedResource.Resources["memory"]
+ vcoreValue, vcorePresent := detailedResource.Resources["vcore"]
if memorySeconds != -1 {
- assert.Equal(t, memorySeconds, memValue)
+ assert.Equal(t, memorySeconds, int64(memValue))
} else {
assert.Equal(t, memPresent, false)
}
if vcoresSecconds != -1 {
- assert.Equal(t, vcoresSecconds, vcoreValue)
+ assert.Equal(t, vcoresSecconds, int64(vcoreValue))
} else {
assert.Equal(t, vcorePresent, false)
}
@@ -1963,7 +1964,7 @@ func TestPreemption(t *testing.T) {
assertPreemptedResource(t, appSummary, -1, 5000)
appSummary = app2.GetApplicationSummary("default")
- assertPreemptedResource(t, appSummary, -1, 0)
+ assert.Assert(t,
appSummary.PreemptedResource.TrackedResourceMap["UNKNOWN"] == nil)
}
// Preemption followed by a normal allocation
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]