This is an automated email from the ASF dual-hosted git repository.

pbacsko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git


The following commit(s) were added to refs/heads/master by this push:
     new 9f900d2e [YUNIKORN-2089] Moved usedResource type and tests to their 
own files (#713)
9f900d2e is described below

commit 9f900d2e5b196b90c427bd8095f68e6a3fb749ae
Author: Vinayak Hegde <[email protected]>
AuthorDate: Mon Nov 27 16:42:21 2023 +0100

    [YUNIKORN-2089] Moved usedResource type and tests to their own files (#713)
    
    Closes: #713
    
    Signed-off-by: Peter Bacsko <[email protected]>
---
 pkg/common/resources/resources.go              |  58 -------
 pkg/common/resources/tracked_resources.go      | 109 +++++++++++++
 pkg/common/resources/tracked_resources_test.go | 212 +++++++++++++++++++++++++
 pkg/scheduler/objects/application.go           |   6 +-
 4 files changed, 324 insertions(+), 61 deletions(-)

diff --git a/pkg/common/resources/resources.go 
b/pkg/common/resources/resources.go
index 52a107cb..efa94bc4 100644
--- a/pkg/common/resources/resources.go
+++ b/pkg/common/resources/resources.go
@@ -25,8 +25,6 @@ import (
        "sort"
        "strconv"
        "strings"
-       "sync"
-       "time"
 
        "go.uber.org/zap"
 
@@ -46,58 +44,6 @@ func (q Quantity) string() string {
        return strconv.FormatInt(int64(q), 10)
 }
 
-// Util struct to keep track of application resource usage
-type TrackedResource struct {
-       // Two level map for aggregated resource usage
-       // With instance type being the top level key, the mapped value is a 
map:
-       //   resource type (CPU, memory etc) -> the aggregated used time (in 
seconds) of the resource type
-       //
-       TrackedResourceMap map[string]map[string]int64
-
-       sync.RWMutex
-}
-
-func (ur *TrackedResource) Clone() *TrackedResource {
-       if ur == nil {
-               return nil
-       }
-       ret := NewTrackedResource()
-       ur.RLock()
-       defer ur.RUnlock()
-       for k, v := range ur.TrackedResourceMap {
-               dest := make(map[string]int64)
-               for key, element := range v {
-                       dest[key] = element
-               }
-               ret.TrackedResourceMap[k] = dest
-       }
-       return ret
-}
-
-// Aggregate the resource usage to UsedResourceMap[instType]
-// The time the given resource used is the delta between the resource 
createTime and currentTime
-func (ur *TrackedResource) AggregateTrackedResource(instType string,
-       resource *Resource, bindTime time.Time) {
-       ur.Lock()
-       defer ur.Unlock()
-
-       releaseTime := time.Now()
-       timeDiff := int64(releaseTime.Sub(bindTime).Seconds())
-       aggregatedResourceTime, ok := ur.TrackedResourceMap[instType]
-       if !ok {
-               aggregatedResourceTime = map[string]int64{}
-       }
-       for key, element := range resource.Resources {
-               curUsage, ok := aggregatedResourceTime[key]
-               if !ok {
-                       curUsage = 0
-               }
-               curUsage += int64(element) * timeDiff // resource size times 
timeDiff
-               aggregatedResourceTime[key] = curUsage
-       }
-       ur.TrackedResourceMap[instType] = aggregatedResourceTime
-}
-
 // Never update value of Zero
 var Zero = NewResource()
 
@@ -157,10 +103,6 @@ func NewResourceFromConf(configMap map[string]string) 
(*Resource, error) {
        return res, nil
 }
 
-func NewTrackedResource() *TrackedResource {
-       return &TrackedResource{TrackedResourceMap: 
make(map[string]map[string]int64)}
-}
-
 func (r *Resource) String() string {
        if r == nil {
                return "nil resource"
diff --git a/pkg/common/resources/tracked_resources.go 
b/pkg/common/resources/tracked_resources.go
new file mode 100644
index 00000000..209e1c11
--- /dev/null
+++ b/pkg/common/resources/tracked_resources.go
@@ -0,0 +1,109 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package resources
+
+import (
+       "fmt"
+       "strings"
+       "sync"
+       "time"
+)
+
+// TrackedResource is a utility struct to keep track of application resource 
usage.
+type TrackedResource struct {
+       // TrackedResourceMap is a two-level map for aggregated resource usage.
+       // The top-level key is the instance type, and the value is a map:
+       //   resource type (CPU, memory, etc.) -> aggregated used time (in 
seconds) of the resource type.
+       TrackedResourceMap map[string]map[string]int64
+
+       sync.RWMutex
+}
+
+// NewTrackedResource creates a new instance of TrackedResource.
+func NewTrackedResource() *TrackedResource {
+       return &TrackedResource{TrackedResourceMap: 
make(map[string]map[string]int64)}
+}
+
+// NewTrackedResourceFromMap creates NewTrackedResource from the given map.
+// Using for Testing purpose only.
+func NewTrackedResourceFromMap(m map[string]map[string]int64) *TrackedResource 
{
+       if m == nil {
+               return NewTrackedResource()
+       }
+       return &TrackedResource{TrackedResourceMap: m}
+}
+
+func (tr *TrackedResource) String() string {
+       tr.RLock()
+       defer tr.RUnlock()
+
+       var resourceUsage []string
+       for instanceType, resourceTypeMap := range tr.TrackedResourceMap {
+               for resourceType, usageTime := range resourceTypeMap {
+                       resourceUsage = append(resourceUsage, 
fmt.Sprintf("%s:%s=%d", instanceType, resourceType, usageTime))
+               }
+       }
+
+       return fmt.Sprintf("TrackedResource{%s}", strings.Join(resourceUsage, 
","))
+}
+
+// Clone creates a deep copy of TrackedResource.
+func (tr *TrackedResource) Clone() *TrackedResource {
+       if tr == nil {
+               return nil
+       }
+       ret := NewTrackedResource()
+       tr.RLock()
+       defer tr.RUnlock()
+       for k, v := range tr.TrackedResourceMap {
+               dest := make(map[string]int64)
+               for key, element := range v {
+                       dest[key] = element
+               }
+               ret.TrackedResourceMap[k] = dest
+       }
+       return ret
+}
+
+// AggregateTrackedResource aggregates resource usage to 
TrackedResourceMap[instType].
+// The time the given resource used is the delta between the resource 
createTime and currentTime.
+func (tr *TrackedResource) AggregateTrackedResource(instType string,
+       resource *Resource, bindTime time.Time) {
+       if resource == nil {
+               return
+       }
+       tr.Lock()
+       defer tr.Unlock()
+
+       releaseTime := time.Now()
+       timeDiff := int64(releaseTime.Sub(bindTime).Seconds())
+       aggregatedResourceTime, ok := tr.TrackedResourceMap[instType]
+       if !ok {
+               aggregatedResourceTime = map[string]int64{}
+       }
+       for key, element := range resource.Resources {
+               curUsage, ok := aggregatedResourceTime[key]
+               if !ok {
+                       curUsage = 0
+               }
+               curUsage += int64(element) * timeDiff // resource size times 
timeDiff
+               aggregatedResourceTime[key] = curUsage
+       }
+       tr.TrackedResourceMap[instType] = aggregatedResourceTime
+}
diff --git a/pkg/common/resources/tracked_resources_test.go 
b/pkg/common/resources/tracked_resources_test.go
new file mode 100644
index 00000000..0e204379
--- /dev/null
+++ b/pkg/common/resources/tracked_resources_test.go
@@ -0,0 +1,212 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package resources
+
+import (
+       "fmt"
+       "reflect"
+       "testing"
+       "time"
+)
+
+func CheckLenOfTrackedResource(res *TrackedResource, expected int) (bool, 
string) {
+       if got := len(res.TrackedResourceMap); expected == 0 && (res == nil || 
got != expected) {
+               return false, fmt.Sprintf("input with empty and nil should be a 
empty tracked resource: Expected %d, got %d", expected, got)
+       }
+       if got := len(res.TrackedResourceMap); got != expected {
+               return false, fmt.Sprintf("Length of tracked resources is 
wrong: Expected %d, got %d", expected, got)
+       }
+       return true, ""
+}
+
+func CheckResourceValueOfTrackedResource(res *TrackedResource, expected 
map[string]map[string]int64) (bool, string) {
+       for instanceType, resources := range expected {
+               for key, value := range resources {
+                       if got := res.TrackedResourceMap[instanceType][key]; 
got != value {
+                               return false, fmt.Sprintf("instance type %s, 
resource %s, expected %d, got %d", instanceType, key, value, got)
+                       }
+               }
+       }
+       return true, ""
+}
+
+func TestNewTrackedResourceFromMap(t *testing.T) {
+       type outputs struct {
+               length           int
+               trackedResources map[string]map[string]int64
+       }
+       var tests = []struct {
+               caseName string
+               input    map[string]map[string]int64
+               expected outputs
+       }{
+               {
+                       "nil",
+                       nil,
+                       outputs{0, map[string]map[string]int64{}},
+               },
+               {
+                       "empty",
+                       map[string]map[string]int64{},
+                       outputs{0, map[string]map[string]int64{}},
+               },
+               {
+                       "tracked resources of one instance type",
+                       map[string]map[string]int64{"instanceType1": {"first": 
1}},
+                       outputs{1, map[string]map[string]int64{"instanceType1": 
{"first": 1}}},
+               },
+               {
+                       "tracked resources of two instance type",
+                       map[string]map[string]int64{"instanceType1": {"first": 
0}, "instanceType2": {"second": -1}},
+                       outputs{2, map[string]map[string]int64{"instanceType1": 
{"first": 0}, "instanceType2": {"second": -1}}},
+               },
+               {
+                       "Multiple tracked resources for one instance type",
+                       map[string]map[string]int64{"instanceType1": {"first": 
1, "second": -2, "third": 3}},
+                       outputs{1, map[string]map[string]int64{"instanceType1": 
{"first": 1, "second": -2, "third": 3}}},
+               },
+       }
+       for _, tt := range tests {
+               t.Run(tt.caseName, func(t *testing.T) {
+                       res := NewTrackedResourceFromMap(tt.input)
+                       if ok, err := CheckLenOfTrackedResource(res, 
tt.expected.length); !ok {
+                               t.Error(err)
+                       } else {
+                               if ok, err := 
CheckResourceValueOfTrackedResource(res, tt.expected.trackedResources); !ok {
+                                       t.Error(err)
+                               }
+                       }
+               })
+       }
+}
+
+func TestTrackedResourceClone(t *testing.T) {
+       var tests = []struct {
+               caseName string
+               input    map[string]map[string]int64
+       }{
+               {
+                       "Nil check",
+                       nil,
+               },
+               {
+                       "No Resources in TrackedResources",
+                       map[string]map[string]int64{},
+               },
+               {
+                       "Proper TrackedResource mappings",
+                       map[string]map[string]int64{"instanceType1": {"first": 
1, "second": -2}, "instanceType2": {"second": -2, "third": 3}},
+               },
+       }
+       for _, tt := range tests {
+               t.Run(tt.caseName, func(t *testing.T) {
+                       original := NewTrackedResourceFromMap(tt.input)
+                       cloned := original.Clone()
+                       if original == cloned {
+                               t.Errorf("cloned trackedResources pointers are 
equal, should not be")
+                       }
+                       if !reflect.DeepEqual(original.TrackedResourceMap, 
cloned.TrackedResourceMap) {
+                               t.Errorf("cloned trackedResources are not 
equal: %v / %v", original, cloned)
+                       }
+               })
+       }
+}
+
+// TestTrackedResourceAggregateTrackedResource tests the 
AggregateTrackedResource function.
+// The modifications to the TrackedResource depend on time.Now(), making it 
challenging
+// to construct an expected TrackedResource for verification.
+// To address this, we use bindTime passed in and set it to a known time 
before "now."
+// This provides a reasonably accurate prediction range, considering only 
whole seconds of the difference.
+// The likelihood of a test failure due to this should be extremely low.
+func TestTrackedResourceAggregateTrackedResource(t *testing.T) {
+       type inputs struct {
+               trackedResource map[string]map[string]int64
+               instType        string
+               otherResource   map[string]Quantity
+               bindTime        time.Time
+       }
+
+       var tests = []struct {
+               caseName                string
+               input                   inputs
+               expectedTrackedResource map[string]map[string]int64
+       }{
+               {
+                       "Resource to be aggregated Nil Check",
+                       inputs{
+                               map[string]map[string]int64{"instanceType1": 
{"first": 1, "second": -2}},
+                               "instanceType1",
+                               nil,
+                               time.Now().Add(-time.Minute),
+                       },
+                       map[string]map[string]int64{"instanceType1": {"first": 
1, "second": -2}},
+               },
+               {
+                       "Resource to be aggregated is Empty",
+                       inputs{
+                               map[string]map[string]int64{"instanceType1": 
{"first": 1, "second": -2}},
+                               "instanceType1",
+                               map[string]Quantity{},
+                               time.Now().Add(-time.Minute),
+                       },
+                       map[string]map[string]int64{"instanceType1": {"first": 
1, "second": -2}},
+               },
+               {
+                       "TrackedResource is empty",
+                       inputs{
+                               map[string]map[string]int64{},
+                               "instanceType1",
+                               map[string]Quantity{"first": 1, "second": 2},
+                               time.Now().Add(-time.Minute),
+                       },
+                       map[string]map[string]int64{"instanceType1": {"first": 
60, "second": 120}},
+               },
+               {
+                       "With Negative Values Involved",
+                       inputs{
+                               map[string]map[string]int64{"instanceType1": 
{"first": 1, "second": -2}},
+                               "instanceType1",
+                               map[string]Quantity{"first": 1, "second": 2},
+                               time.Now().Add(-time.Minute),
+                       },
+                       map[string]map[string]int64{"instanceType1": {"first": 
61, "second": 118}},
+               },
+               {
+                       "Multiple Instance Types",
+                       inputs{
+                               map[string]map[string]int64{"instanceType1": 
{"first": 1, "second": 2}, "instanceType2": {"third": 3, "four": 4}},
+                               "instanceType2",
+                               map[string]Quantity{"first": 1, "second": 2},
+                               time.Now().Add(-time.Minute),
+                       },
+                       map[string]map[string]int64{"instanceType1": {"first": 
1, "second": 2}, "instanceType2": {"first": 60, "second": 120, "third": 3, 
"four": 4}},
+               },
+       }
+       for _, tt := range tests {
+               t.Run(tt.caseName, func(t *testing.T) {
+                       original := 
NewTrackedResourceFromMap(tt.input.trackedResource)
+                       original.AggregateTrackedResource(tt.input.instType, 
&Resource{tt.input.otherResource}, tt.input.bindTime)
+                       expected := 
NewTrackedResourceFromMap(tt.expectedTrackedResource)
+
+                       if !reflect.DeepEqual(original.TrackedResourceMap, 
expected.TrackedResourceMap) {
+                               t.Errorf("trackedResources are not equal, 
original trackedResource after aggrigation: %v / expected: %v", original, 
expected)
+                       }
+               })
+       }
+}
diff --git a/pkg/scheduler/objects/application.go 
b/pkg/scheduler/objects/application.go
index 086f0219..db8f4485 100644
--- a/pkg/scheduler/objects/application.go
+++ b/pkg/scheduler/objects/application.go
@@ -142,9 +142,9 @@ func (as *ApplicationSummary) DoLogging() {
                zap.String("queue", as.Queue),
                zap.String("state", as.State),
                zap.String("rmID", as.RmID),
-               zap.Any("resourceUsage", as.ResourceUsage.TrackedResourceMap),
-               zap.Any("preemptedResource", 
as.PreemptedResource.TrackedResourceMap),
-               zap.Any("placeHolderResource", 
as.PlaceholderResource.TrackedResourceMap),
+               zap.Stringer("resourceUsage", as.ResourceUsage),
+               zap.Stringer("preemptedResource", as.PreemptedResource),
+               zap.Stringer("placeHolderResource", as.PlaceholderResource),
        )
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to