This is an automated email from the ASF dual-hosted git repository.
pbacsko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git
The following commit(s) were added to refs/heads/master by this push:
new 9f900d2e [YUNIKORN-2089] Moved usedResource type and tests to their
own files (#713)
9f900d2e is described below
commit 9f900d2e5b196b90c427bd8095f68e6a3fb749ae
Author: Vinayak Hegde <[email protected]>
AuthorDate: Mon Nov 27 16:42:21 2023 +0100
[YUNIKORN-2089] Moved usedResource type and tests to their own files (#713)
Closes: #713
Signed-off-by: Peter Bacsko <[email protected]>
---
pkg/common/resources/resources.go | 58 -------
pkg/common/resources/tracked_resources.go | 109 +++++++++++++
pkg/common/resources/tracked_resources_test.go | 212 +++++++++++++++++++++++++
pkg/scheduler/objects/application.go | 6 +-
4 files changed, 324 insertions(+), 61 deletions(-)
diff --git a/pkg/common/resources/resources.go
b/pkg/common/resources/resources.go
index 52a107cb..efa94bc4 100644
--- a/pkg/common/resources/resources.go
+++ b/pkg/common/resources/resources.go
@@ -25,8 +25,6 @@ import (
"sort"
"strconv"
"strings"
- "sync"
- "time"
"go.uber.org/zap"
@@ -46,58 +44,6 @@ func (q Quantity) string() string {
return strconv.FormatInt(int64(q), 10)
}
-// Util struct to keep track of application resource usage
-type TrackedResource struct {
- // Two level map for aggregated resource usage
- // With instance type being the top level key, the mapped value is a
map:
- // resource type (CPU, memory etc) -> the aggregated used time (in
seconds) of the resource type
- //
- TrackedResourceMap map[string]map[string]int64
-
- sync.RWMutex
-}
-
-func (ur *TrackedResource) Clone() *TrackedResource {
- if ur == nil {
- return nil
- }
- ret := NewTrackedResource()
- ur.RLock()
- defer ur.RUnlock()
- for k, v := range ur.TrackedResourceMap {
- dest := make(map[string]int64)
- for key, element := range v {
- dest[key] = element
- }
- ret.TrackedResourceMap[k] = dest
- }
- return ret
-}
-
-// Aggregate the resource usage to UsedResourceMap[instType]
-// The time the given resource used is the delta between the resource
createTime and currentTime
-func (ur *TrackedResource) AggregateTrackedResource(instType string,
- resource *Resource, bindTime time.Time) {
- ur.Lock()
- defer ur.Unlock()
-
- releaseTime := time.Now()
- timeDiff := int64(releaseTime.Sub(bindTime).Seconds())
- aggregatedResourceTime, ok := ur.TrackedResourceMap[instType]
- if !ok {
- aggregatedResourceTime = map[string]int64{}
- }
- for key, element := range resource.Resources {
- curUsage, ok := aggregatedResourceTime[key]
- if !ok {
- curUsage = 0
- }
- curUsage += int64(element) * timeDiff // resource size times
timeDiff
- aggregatedResourceTime[key] = curUsage
- }
- ur.TrackedResourceMap[instType] = aggregatedResourceTime
-}
-
// Never update value of Zero
var Zero = NewResource()
@@ -157,10 +103,6 @@ func NewResourceFromConf(configMap map[string]string)
(*Resource, error) {
return res, nil
}
-func NewTrackedResource() *TrackedResource {
- return &TrackedResource{TrackedResourceMap:
make(map[string]map[string]int64)}
-}
-
func (r *Resource) String() string {
if r == nil {
return "nil resource"
diff --git a/pkg/common/resources/tracked_resources.go
b/pkg/common/resources/tracked_resources.go
new file mode 100644
index 00000000..209e1c11
--- /dev/null
+++ b/pkg/common/resources/tracked_resources.go
@@ -0,0 +1,109 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package resources
+
+import (
+ "fmt"
+ "strings"
+ "sync"
+ "time"
+)
+
+// TrackedResource is a utility struct to keep track of application resource
usage.
+type TrackedResource struct {
+ // TrackedResourceMap is a two-level map for aggregated resource usage.
+ // The top-level key is the instance type, and the value is a map:
+ // resource type (CPU, memory, etc.) -> aggregated used time (in
seconds) of the resource type.
+ TrackedResourceMap map[string]map[string]int64
+
+ sync.RWMutex
+}
+
+// NewTrackedResource creates a new instance of TrackedResource.
+func NewTrackedResource() *TrackedResource {
+ return &TrackedResource{TrackedResourceMap:
make(map[string]map[string]int64)}
+}
+
+// NewTrackedResourceFromMap creates NewTrackedResource from the given map.
+// Using for Testing purpose only.
+func NewTrackedResourceFromMap(m map[string]map[string]int64) *TrackedResource
{
+ if m == nil {
+ return NewTrackedResource()
+ }
+ return &TrackedResource{TrackedResourceMap: m}
+}
+
+func (tr *TrackedResource) String() string {
+ tr.RLock()
+ defer tr.RUnlock()
+
+ var resourceUsage []string
+ for instanceType, resourceTypeMap := range tr.TrackedResourceMap {
+ for resourceType, usageTime := range resourceTypeMap {
+ resourceUsage = append(resourceUsage,
fmt.Sprintf("%s:%s=%d", instanceType, resourceType, usageTime))
+ }
+ }
+
+ return fmt.Sprintf("TrackedResource{%s}", strings.Join(resourceUsage,
","))
+}
+
+// Clone creates a deep copy of TrackedResource.
+func (tr *TrackedResource) Clone() *TrackedResource {
+ if tr == nil {
+ return nil
+ }
+ ret := NewTrackedResource()
+ tr.RLock()
+ defer tr.RUnlock()
+ for k, v := range tr.TrackedResourceMap {
+ dest := make(map[string]int64)
+ for key, element := range v {
+ dest[key] = element
+ }
+ ret.TrackedResourceMap[k] = dest
+ }
+ return ret
+}
+
+// AggregateTrackedResource aggregates resource usage to
TrackedResourceMap[instType].
+// The time the given resource used is the delta between the resource
createTime and currentTime.
+func (tr *TrackedResource) AggregateTrackedResource(instType string,
+ resource *Resource, bindTime time.Time) {
+ if resource == nil {
+ return
+ }
+ tr.Lock()
+ defer tr.Unlock()
+
+ releaseTime := time.Now()
+ timeDiff := int64(releaseTime.Sub(bindTime).Seconds())
+ aggregatedResourceTime, ok := tr.TrackedResourceMap[instType]
+ if !ok {
+ aggregatedResourceTime = map[string]int64{}
+ }
+ for key, element := range resource.Resources {
+ curUsage, ok := aggregatedResourceTime[key]
+ if !ok {
+ curUsage = 0
+ }
+ curUsage += int64(element) * timeDiff // resource size times
timeDiff
+ aggregatedResourceTime[key] = curUsage
+ }
+ tr.TrackedResourceMap[instType] = aggregatedResourceTime
+}
diff --git a/pkg/common/resources/tracked_resources_test.go
b/pkg/common/resources/tracked_resources_test.go
new file mode 100644
index 00000000..0e204379
--- /dev/null
+++ b/pkg/common/resources/tracked_resources_test.go
@@ -0,0 +1,212 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package resources
+
+import (
+ "fmt"
+ "reflect"
+ "testing"
+ "time"
+)
+
+func CheckLenOfTrackedResource(res *TrackedResource, expected int) (bool,
string) {
+ if got := len(res.TrackedResourceMap); expected == 0 && (res == nil ||
got != expected) {
+ return false, fmt.Sprintf("input with empty and nil should be a
empty tracked resource: Expected %d, got %d", expected, got)
+ }
+ if got := len(res.TrackedResourceMap); got != expected {
+ return false, fmt.Sprintf("Length of tracked resources is
wrong: Expected %d, got %d", expected, got)
+ }
+ return true, ""
+}
+
+func CheckResourceValueOfTrackedResource(res *TrackedResource, expected
map[string]map[string]int64) (bool, string) {
+ for instanceType, resources := range expected {
+ for key, value := range resources {
+ if got := res.TrackedResourceMap[instanceType][key];
got != value {
+ return false, fmt.Sprintf("instance type %s,
resource %s, expected %d, got %d", instanceType, key, value, got)
+ }
+ }
+ }
+ return true, ""
+}
+
+func TestNewTrackedResourceFromMap(t *testing.T) {
+ type outputs struct {
+ length int
+ trackedResources map[string]map[string]int64
+ }
+ var tests = []struct {
+ caseName string
+ input map[string]map[string]int64
+ expected outputs
+ }{
+ {
+ "nil",
+ nil,
+ outputs{0, map[string]map[string]int64{}},
+ },
+ {
+ "empty",
+ map[string]map[string]int64{},
+ outputs{0, map[string]map[string]int64{}},
+ },
+ {
+ "tracked resources of one instance type",
+ map[string]map[string]int64{"instanceType1": {"first":
1}},
+ outputs{1, map[string]map[string]int64{"instanceType1":
{"first": 1}}},
+ },
+ {
+ "tracked resources of two instance type",
+ map[string]map[string]int64{"instanceType1": {"first":
0}, "instanceType2": {"second": -1}},
+ outputs{2, map[string]map[string]int64{"instanceType1":
{"first": 0}, "instanceType2": {"second": -1}}},
+ },
+ {
+ "Multiple tracked resources for one instance type",
+ map[string]map[string]int64{"instanceType1": {"first":
1, "second": -2, "third": 3}},
+ outputs{1, map[string]map[string]int64{"instanceType1":
{"first": 1, "second": -2, "third": 3}}},
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.caseName, func(t *testing.T) {
+ res := NewTrackedResourceFromMap(tt.input)
+ if ok, err := CheckLenOfTrackedResource(res,
tt.expected.length); !ok {
+ t.Error(err)
+ } else {
+ if ok, err :=
CheckResourceValueOfTrackedResource(res, tt.expected.trackedResources); !ok {
+ t.Error(err)
+ }
+ }
+ })
+ }
+}
+
+func TestTrackedResourceClone(t *testing.T) {
+ var tests = []struct {
+ caseName string
+ input map[string]map[string]int64
+ }{
+ {
+ "Nil check",
+ nil,
+ },
+ {
+ "No Resources in TrackedResources",
+ map[string]map[string]int64{},
+ },
+ {
+ "Proper TrackedResource mappings",
+ map[string]map[string]int64{"instanceType1": {"first":
1, "second": -2}, "instanceType2": {"second": -2, "third": 3}},
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.caseName, func(t *testing.T) {
+ original := NewTrackedResourceFromMap(tt.input)
+ cloned := original.Clone()
+ if original == cloned {
+ t.Errorf("cloned trackedResources pointers are
equal, should not be")
+ }
+ if !reflect.DeepEqual(original.TrackedResourceMap,
cloned.TrackedResourceMap) {
+ t.Errorf("cloned trackedResources are not
equal: %v / %v", original, cloned)
+ }
+ })
+ }
+}
+
+// TestTrackedResourceAggregateTrackedResource tests the
AggregateTrackedResource function.
+// The modifications to the TrackedResource depend on time.Now(), making it
challenging
+// to construct an expected TrackedResource for verification.
+// To address this, we use bindTime passed in and set it to a known time
before "now."
+// This provides a reasonably accurate prediction range, considering only
whole seconds of the difference.
+// The likelihood of a test failure due to this should be extremely low.
+func TestTrackedResourceAggregateTrackedResource(t *testing.T) {
+ type inputs struct {
+ trackedResource map[string]map[string]int64
+ instType string
+ otherResource map[string]Quantity
+ bindTime time.Time
+ }
+
+ var tests = []struct {
+ caseName string
+ input inputs
+ expectedTrackedResource map[string]map[string]int64
+ }{
+ {
+ "Resource to be aggregated Nil Check",
+ inputs{
+ map[string]map[string]int64{"instanceType1":
{"first": 1, "second": -2}},
+ "instanceType1",
+ nil,
+ time.Now().Add(-time.Minute),
+ },
+ map[string]map[string]int64{"instanceType1": {"first":
1, "second": -2}},
+ },
+ {
+ "Resource to be aggregated is Empty",
+ inputs{
+ map[string]map[string]int64{"instanceType1":
{"first": 1, "second": -2}},
+ "instanceType1",
+ map[string]Quantity{},
+ time.Now().Add(-time.Minute),
+ },
+ map[string]map[string]int64{"instanceType1": {"first":
1, "second": -2}},
+ },
+ {
+ "TrackedResource is empty",
+ inputs{
+ map[string]map[string]int64{},
+ "instanceType1",
+ map[string]Quantity{"first": 1, "second": 2},
+ time.Now().Add(-time.Minute),
+ },
+ map[string]map[string]int64{"instanceType1": {"first":
60, "second": 120}},
+ },
+ {
+ "With Negative Values Involved",
+ inputs{
+ map[string]map[string]int64{"instanceType1":
{"first": 1, "second": -2}},
+ "instanceType1",
+ map[string]Quantity{"first": 1, "second": 2},
+ time.Now().Add(-time.Minute),
+ },
+ map[string]map[string]int64{"instanceType1": {"first":
61, "second": 118}},
+ },
+ {
+ "Multiple Instance Types",
+ inputs{
+ map[string]map[string]int64{"instanceType1":
{"first": 1, "second": 2}, "instanceType2": {"third": 3, "four": 4}},
+ "instanceType2",
+ map[string]Quantity{"first": 1, "second": 2},
+ time.Now().Add(-time.Minute),
+ },
+ map[string]map[string]int64{"instanceType1": {"first":
1, "second": 2}, "instanceType2": {"first": 60, "second": 120, "third": 3,
"four": 4}},
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.caseName, func(t *testing.T) {
+ original :=
NewTrackedResourceFromMap(tt.input.trackedResource)
+ original.AggregateTrackedResource(tt.input.instType,
&Resource{tt.input.otherResource}, tt.input.bindTime)
+ expected :=
NewTrackedResourceFromMap(tt.expectedTrackedResource)
+
+ if !reflect.DeepEqual(original.TrackedResourceMap,
expected.TrackedResourceMap) {
+ t.Errorf("trackedResources are not equal,
original trackedResource after aggrigation: %v / expected: %v", original,
expected)
+ }
+ })
+ }
+}
diff --git a/pkg/scheduler/objects/application.go
b/pkg/scheduler/objects/application.go
index 086f0219..db8f4485 100644
--- a/pkg/scheduler/objects/application.go
+++ b/pkg/scheduler/objects/application.go
@@ -142,9 +142,9 @@ func (as *ApplicationSummary) DoLogging() {
zap.String("queue", as.Queue),
zap.String("state", as.State),
zap.String("rmID", as.RmID),
- zap.Any("resourceUsage", as.ResourceUsage.TrackedResourceMap),
- zap.Any("preemptedResource",
as.PreemptedResource.TrackedResourceMap),
- zap.Any("placeHolderResource",
as.PlaceholderResource.TrackedResourceMap),
+ zap.Stringer("resourceUsage", as.ResourceUsage),
+ zap.Stringer("preemptedResource", as.PreemptedResource),
+ zap.Stringer("placeHolderResource", as.PlaceholderResource),
)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]