This is an automated email from the ASF dual-hosted git repository.

wilfreds pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git


The following commit(s) were added to refs/heads/master by this push:
     new 0fd74361 [YUNIKORN-2087] reinstate node utilisation rest endpoint 
(#691)
0fd74361 is described below

commit 0fd743614b973650fc3fa24488be240e2e0b3121
Author: Wilfred Spiegelenburg <[email protected]>
AuthorDate: Thu Nov 9 13:58:54 2023 +1100

    [YUNIKORN-2087] reinstate node utilisation rest endpoint (#691)
    
    The node utilisation endpoint is required to be exposed for the
    dashboard to work after YUNIKORN-325
    Moving the endpoint to /ws/v1/scheduler/node-utilization
    The endpoint will need to change as part of YUNIKORN-2088
    
    Dominant resource is calculated for the root queue and that utilisation
    is shown for the nodes. The dominant reource type is the type with the
    highest usage ratio. A pre defined ratio for zero usage and capacity is
    defined as: 0 capacity, 0 usage: 0; 0 capacity, >0 usage: 1 otherwise
    ratio = usage / capacity.
    
    Closes: #691
    
    Signed-off-by: Wilfred Spiegelenburg <[email protected]>
---
 pkg/common/resources/resources.go      |  41 +++++++++
 pkg/common/resources/resources_test.go |  26 ++++++
 pkg/webservice/handlers.go             |  76 ++++++++++++-----
 pkg/webservice/handlers_test.go        | 151 ++++++++++++++++++++++++++-------
 pkg/webservice/routes.go               |   6 ++
 pkg/webservice/webservice.go           |   4 +-
 6 files changed, 251 insertions(+), 53 deletions(-)

diff --git a/pkg/common/resources/resources.go 
b/pkg/common/resources/resources.go
index 2aee8c04..bf96e98f 100644
--- a/pkg/common/resources/resources.go
+++ b/pkg/common/resources/resources.go
@@ -993,3 +993,44 @@ func CalculateAbsUsedCapacity(capacity, used *Resource) 
*Resource {
        }
        return absResource
 }
+
+// DominantResourceType calculates the most used resource type based on the 
ratio of used compared to
+// the capacity. If a capacity type is set to 0 assume full usage.
+// Dominant type should be calculated with queue usage and capacity. Queue 
capacities should never
+// contain 0 values when there is a usage also, however in the root queue this 
could happen. If the
+// last node reporting that resource was removed but not everything has been 
updated.
+// immediately
+// Ignores resources types that are used but not defined in the capacity.
+func (r *Resource) DominantResourceType(capacity *Resource) string {
+       if r == nil || capacity == nil {
+               return ""
+       }
+       var div, temp float64
+       dominant := ""
+       for name, usedVal := range r.Resources {
+               capVal, ok := capacity.Resources[name]
+               if !ok {
+                       log.Log(log.Resources).Debug("missing resource in 
dominant calculation",
+                               zap.String("missing resource", name))
+                       continue
+               }
+               // calculate the ratio between usage and capacity
+               // ratio should be somewhere between 0 and 1, but do not 
restrict
+               // handle 0 values specifically just to be safe should never 
happen
+               if capVal == 0 {
+                       if usedVal == 0 {
+                               temp = 0 // no usage, no cap: consider empty
+                       } else {
+                               temp = 1 // usage, no cap: fully used
+                       }
+               } else {
+                       temp = float64(usedVal) / float64(capVal) // both not 
zero calculate ratio
+               }
+               // if we have exactly the same use the latest one
+               if temp >= div {
+                       div = temp
+                       dominant = name
+               }
+       }
+       return dominant
+}
diff --git a/pkg/common/resources/resources_test.go 
b/pkg/common/resources/resources_test.go
index 8460017c..5c32095c 100644
--- a/pkg/common/resources/resources_test.go
+++ b/pkg/common/resources/resources_test.go
@@ -1701,3 +1701,29 @@ func TestIsEmpty(t *testing.T) {
                })
        }
 }
+
+func TestResource_DominantResource(t *testing.T) {
+       tests := []struct {
+               name     string
+               used     *Resource
+               capacity *Resource
+               wantName string
+       }{
+               {"nil receiver", nil, Zero, ""},
+               {"nil cap", Zero, nil, ""},
+               {"zero cap", NewResourceFromMap(map[string]Quantity{"A": 10}), 
Zero, ""},
+               {"over cap", NewResourceFromMap(map[string]Quantity{"A": 20}), 
NewResourceFromMap(map[string]Quantity{"A": 10}), "A"},
+               {"zero usage exist", 
NewResourceFromMap(map[string]Quantity{"A": 0}), 
NewResourceFromMap(map[string]Quantity{"A": 10}), "A"},
+               {"usage not in cap", 
NewResourceFromMap(map[string]Quantity{"B": 10}), 
NewResourceFromMap(map[string]Quantity{"A": 10}), ""},
+               {"multiple usages", NewResourceFromMap(map[string]Quantity{"B": 
10, "A": 10}), NewResourceFromMap(map[string]Quantity{"A": 10, "B": 20}), "A"},
+               {"0 usage with 0 cap", 
NewResourceFromMap(map[string]Quantity{"A": 0}), 
NewResourceFromMap(map[string]Quantity{"A": 0}), "A"},
+               {"usage with 0 cap", 
NewResourceFromMap(map[string]Quantity{"A": 10, "B": 5}), 
NewResourceFromMap(map[string]Quantity{"A": 0, "B": 10}), "A"},
+       }
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       if got := tt.used.DominantResourceType(tt.capacity); 
got != tt.wantName {
+                               t.Errorf("DominantResourceType() = %v, want 
%v", got, tt.wantName)
+                       }
+               })
+       }
+}
diff --git a/pkg/webservice/handlers.go b/pkg/webservice/handlers.go
index 4192093b..92cc472e 100644
--- a/pkg/webservice/handlers.go
+++ b/pkg/webservice/handlers.go
@@ -48,15 +48,17 @@ import (
        "github.com/apache/yunikorn-core/pkg/webservice/dao"
 )
 
-const PartitionDoesNotExists = "Partition not found"
-const MissingParamsName = "Missing parameters"
-const QueueDoesNotExists = "Queue not found"
-const UserDoesNotExists = "User not found"
-const GroupDoesNotExists = "Group not found"
-const UserNameMissing = "User name is missing"
-const GroupNameMissing = "Group name is missing"
-const ApplicationDoesNotExists = "Application not found"
-const NodeDoesNotExists = "Node not found"
+const (
+       PartitionDoesNotExists   = "Partition not found"
+       MissingParamsName        = "Missing parameters"
+       QueueDoesNotExists       = "Queue not found"
+       UserDoesNotExists        = "User not found"
+       GroupDoesNotExists       = "Group not found"
+       UserNameMissing          = "User name is missing"
+       GroupNameMissing         = "Group name is missing"
+       ApplicationDoesNotExists = "Application not found"
+       NodeDoesNotExists        = "Node not found"
+)
 
 func getStackInfo(w http.ResponseWriter, r *http.Request) {
        writeHeaders(w)
@@ -146,7 +148,7 @@ func getClusterJSON(partition *scheduler.PartitionContext) 
*dao.ClusterDAOInfo {
 
 func getClusterUtilJSON(partition *scheduler.PartitionContext) 
[]*dao.ClusterUtilDAOInfo {
        var utils []*dao.ClusterUtilDAOInfo
-       var getResource bool = true
+       var getResource = true
        total := partition.GetTotalPartitionResource()
        if resources.IsZero(total) {
                getResource = false
@@ -162,7 +164,7 @@ func getClusterUtilJSON(partition 
*scheduler.PartitionContext) []*dao.ClusterUti
                                ResourceType: name,
                                Total:        int64(total.Resources[name]),
                                Used:         int64(used.Resources[name]),
-                               Usage:        fmt.Sprintf("%d", int64(value)) + 
"%",
+                               Usage:        fmt.Sprintf("%d%%", int64(value)),
                        }
                        utils = append(utils, utilization)
                }
@@ -348,29 +350,57 @@ func getNodesDAO(entries []*objects.Node) 
[]*dao.NodeDAOInfo {
        return nodesDAO
 }
 
+// getNodeUtilisation loads the node utilisation based on the dominant 
resource used
+// for the default partition. Dominant resource is defined as the highest 
utilised resource
+// type on the root queue based on the registered resources.
+// Use the default partition until we get YUNIKORN-2088 fixed
+func getNodeUtilisation(w http.ResponseWriter, r *http.Request) {
+       writeHeaders(w)
+       partitionContext := 
schedulerContext.GetPartitionWithoutClusterID(configs.DefaultPartition)
+       if partitionContext == nil {
+               buildJSONErrorResponse(w, PartitionDoesNotExists, 
http.StatusBadRequest)
+               return
+       }
+       // calculate the dominant resource based on root queue usage and size
+       rootQ := partitionContext.GetQueue(configs.RootQueue)
+       rootMax := rootQ.GetMaxResource()
+       // if no nodes have been registered return an empty object
+       nodesDao := &dao.NodesUtilDAOInfo{}
+       if !resources.IsZero(rootMax) {
+               // if nothing is used we get an empty dominant resource and 
return an empty object
+               rootUsed := rootQ.GetAllocatedResource()
+               dominant := rootUsed.DominantResourceType(rootMax)
+               nodesDao = getNodesUtilJSON(partitionContext, dominant)
+       }
+       if err := json.NewEncoder(w).Encode(nodesDao); err != nil {
+               buildJSONErrorResponse(w, err.Error(), 
http.StatusInternalServerError)
+       }
+}
+
+// getNodesUtilJSON loads the nodes utilisation for a partition for a specific 
resource type.
 func getNodesUtilJSON(partition *scheduler.PartitionContext, name string) 
*dao.NodesUtilDAOInfo {
        mapResult := make([]int, 10)
        mapName := make([][]string, 10)
        var v float64
        var nodeUtil []*dao.NodeUtilDAOInfo
+       var idx int
        for _, node := range partition.GetNodes() {
-               resourceExist := true
-               // check resource exist or not
+               // check resource exist or not: only count if node advertises 
the resource
                total := node.GetCapacity()
-               if total.Resources[name] <= 0 {
-                       resourceExist = false
+               if _, ok := total.Resources[name]; !ok {
+                       continue
                }
                resourceAllocated := node.GetAllocatedResource()
-               if _, ok := resourceAllocated.Resources[name]; !ok {
-                       resourceExist = false
-               }
-               // if resource exist in node, record the bucket it should go
-               if resourceExist {
+               // if resource exist in node, record the bucket it should go 
into,
+               // otherwise none is used, and it should end up in the 0 bucket
+               if _, ok := resourceAllocated.Resources[name]; ok {
                        v = float64(resources.CalculateAbsUsedCapacity(total, 
resourceAllocated).Resources[name])
-                       idx := int(math.Dim(math.Ceil(v/10), 1))
-                       mapResult[idx]++
-                       mapName[idx] = append(mapName[idx], node.NodeID)
+                       idx = int(math.Dim(math.Ceil(v/10), 1))
+               } else {
+                       idx = 0
                }
+               mapResult[idx]++
+               mapName[idx] = append(mapName[idx], node.NodeID)
        }
        // put number of nodes and node name to different buckets
        for k := 0; k < 10; k++ {
diff --git a/pkg/webservice/handlers_test.go b/pkg/webservice/handlers_test.go
index 136f6b1f..b88117ff 100644
--- a/pkg/webservice/handlers_test.go
+++ b/pkg/webservice/handlers_test.go
@@ -661,11 +661,14 @@ func TestGetNodesUtilJSON(t *testing.T) {
 
        // create test nodes
        nodeRes := 
resources.NewResourceFromMap(map[string]resources.Quantity{siCommon.Memory: 
1000, siCommon.CPU: 1000}).ToProto()
-       nodeRes2 := 
resources.NewResourceFromMap(map[string]resources.Quantity{siCommon.Memory: 
1000, siCommon.CPU: 1000, "GPU": 10}).ToProto()
        node1ID := "node-1"
        node1 := objects.NewNode(&si.NodeInfo{NodeID: node1ID, 
SchedulableResource: nodeRes})
        node2ID := "node-2"
+       nodeRes2 := 
resources.NewResourceFromMap(map[string]resources.Quantity{siCommon.Memory: 
1000, siCommon.CPU: 1000, "GPU": 10}).ToProto()
        node2 := objects.NewNode(&si.NodeInfo{NodeID: node2ID, 
SchedulableResource: nodeRes2})
+       node3ID := "node-3"
+       nodeCPU := 
resources.NewResourceFromMap(map[string]resources.Quantity{siCommon.CPU: 
1000}).ToProto()
+       node3 := objects.NewNode(&si.NodeInfo{NodeID: node3ID, 
SchedulableResource: nodeCPU})
 
        // create test allocations
        resAlloc1 := 
resources.NewResourceFromMap(map[string]resources.Quantity{siCommon.Memory: 
500, siCommon.CPU: 300})
@@ -678,36 +681,126 @@ func TestGetNodesUtilJSON(t *testing.T) {
        allocs = []*objects.Allocation{objects.NewAllocation("alloc-2-uuid", 
node2ID, ask2)}
        err = partition.AddNode(node2, allocs)
        assert.NilError(t, err, "add node to partition should not have failed")
+       err = partition.AddNode(node3, nil)
+       assert.NilError(t, err, "add node to partition should not have failed")
+
+       // two nodes advertise memory: must show up in the list
+       result := getNodesUtilJSON(partition, siCommon.Memory)
+       subResult := result.NodesUtil
+       assert.Equal(t, result.ResourceType, siCommon.Memory)
+       assert.Equal(t, subResult[2].NumOfNodes, int64(1))
+       assert.Equal(t, subResult[4].NumOfNodes, int64(1))
+       assert.Equal(t, subResult[2].NodeNames[0], node2ID)
+       assert.Equal(t, subResult[4].NodeNames[0], node1ID)
+
+       // three nodes advertise cpu: must show up in the list
+       result = getNodesUtilJSON(partition, siCommon.CPU)
+       subResult = result.NodesUtil
+       assert.Equal(t, result.ResourceType, siCommon.CPU)
+       assert.Equal(t, subResult[0].NumOfNodes, int64(1))
+       assert.Equal(t, subResult[0].NodeNames[0], node3ID)
+       assert.Equal(t, subResult[2].NumOfNodes, int64(1))
+       assert.Equal(t, subResult[2].NodeNames[0], node1ID)
+       assert.Equal(t, subResult[4].NumOfNodes, int64(1))
+       assert.Equal(t, subResult[4].NodeNames[0], node2ID)
+
+       // one node advertise GPU: must show up in the list
+       result = getNodesUtilJSON(partition, "GPU")
+       subResult = result.NodesUtil
+       assert.Equal(t, result.ResourceType, "GPU")
+       assert.Equal(t, subResult[4].NumOfNodes, int64(1))
+       assert.Equal(t, subResult[4].NodeNames[0], node2ID)
+
+       result = getNodesUtilJSON(partition, "non-exist")
+       subResult = result.NodesUtil
+       assert.Equal(t, result.ResourceType, "non-exist")
+       assert.Equal(t, subResult[0].NumOfNodes, int64(0))
+       assert.Equal(t, len(subResult[0].NodeNames), 0)
+}
+
+func TestGetNodeUtilisation(t *testing.T) {
+       NewWebApp(&scheduler.ClusterContext{}, nil)
+
+       // var req *http.Request
+       req, err := http.NewRequest("GET", "/ws/v1/scheduler/node-utilization", 
strings.NewReader(""))
+       assert.NilError(t, err, "Get node utilisation Handler request failed")
+       req = req.WithContext(context.TODO())
+       resp := &MockResponseWriter{}
+
+       getNodeUtilisation(resp, req)
+       var errInfo dao.YAPIError
+       err = json.Unmarshal(resp.outputBytes, &errInfo)
+       assert.NilError(t, err, "getNodeUtilisation should have returned and 
error")
+
+       partition := setup(t, configDefault, 1)
+       utilisation := &dao.NodesUtilDAOInfo{}
+       err = json.Unmarshal(resp.outputBytes, utilisation)
+       assert.NilError(t, err, "getNodeUtilisation should have returned an 
empty object")
+       assert.Equal(t, utilisation.ResourceType, "", "unexpected type 
returned")
+       assert.Equal(t, len(utilisation.NodesUtil), 0, "no nodes should be 
returned")
+       assert.Assert(t, confirmNodeCount(utilisation.NodesUtil, 0), 
"unexpected number of nodes returned should be 0")
+
+       // create test nodes
+       nodeRes := 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 
10}).ToProto()
+       nodeRes2 := 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10, 
"second": 5}).ToProto()
+       node1ID := "node-1"
+       node1 := objects.NewNode(&si.NodeInfo{NodeID: node1ID, 
SchedulableResource: nodeRes})
+       node2ID := "node-2"
+       node2 := objects.NewNode(&si.NodeInfo{NodeID: node2ID, 
SchedulableResource: nodeRes2})
+
+       err = partition.AddNode(node1, nil)
+       assert.NilError(t, err, "add node to partition should not have failed")
+       err = partition.AddNode(node2, nil)
+       assert.NilError(t, err, "add node to partition should not have failed")
 
        // get nodes utilization
-       res1 := getNodesUtilJSON(partition, siCommon.Memory)
-       res2 := getNodesUtilJSON(partition, siCommon.CPU)
-       res3 := getNodesUtilJSON(partition, "GPU")
-       resNon := getNodesUtilJSON(partition, "non-exist")
-       subres1 := res1.NodesUtil
-       subres2 := res2.NodesUtil
-       subres3 := res3.NodesUtil
-       subresNon := resNon.NodesUtil
-
-       assert.Equal(t, res1.ResourceType, siCommon.Memory)
-       assert.Equal(t, subres1[2].NumOfNodes, int64(1))
-       assert.Equal(t, subres1[4].NumOfNodes, int64(1))
-       assert.Equal(t, subres1[2].NodeNames[0], node2ID)
-       assert.Equal(t, subres1[4].NodeNames[0], node1ID)
-
-       assert.Equal(t, res2.ResourceType, siCommon.CPU)
-       assert.Equal(t, subres2[2].NumOfNodes, int64(1))
-       assert.Equal(t, subres2[4].NumOfNodes, int64(1))
-       assert.Equal(t, subres2[2].NodeNames[0], node1ID)
-       assert.Equal(t, subres2[4].NodeNames[0], node2ID)
-
-       assert.Equal(t, res3.ResourceType, "GPU")
-       assert.Equal(t, subres3[4].NumOfNodes, int64(1))
-       assert.Equal(t, subres3[4].NodeNames[0], node2ID)
-
-       assert.Equal(t, resNon.ResourceType, "non-exist")
-       assert.Equal(t, subresNon[0].NumOfNodes, int64(0))
-       assert.Equal(t, len(subresNon[0].NodeNames), 0)
+       getNodeUtilisation(resp, req)
+       utilisation = &dao.NodesUtilDAOInfo{}
+       err = json.Unmarshal(resp.outputBytes, utilisation)
+       assert.NilError(t, err, "getNodeUtilisation should have returned an 
object")
+       assert.Equal(t, utilisation.ResourceType, "", "unexpected type 
returned")
+       assert.Equal(t, len(utilisation.NodesUtil), 10, "empty usage: 
unexpected bucket count returned")
+       assert.Assert(t, confirmNodeCount(utilisation.NodesUtil, 0), 
"unexpected number of nodes returned should be 0")
+
+       resAlloc := 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10})
+       ask := objects.NewAllocationAsk("alloc-1", "app", resAlloc)
+       alloc := objects.NewAllocation("alloc-1-uuid", node1ID, ask)
+       assert.Assert(t, node1.AddAllocation(alloc), "unexpected failure adding 
allocation to node")
+       rootQ := partition.GetQueue("root")
+       err = rootQ.IncAllocatedResource(resAlloc, false)
+       assert.NilError(t, err, "unexpected error returned setting allocated 
resource on queue")
+       // get nodes utilization
+       getNodeUtilisation(resp, req)
+       utilisation = &dao.NodesUtilDAOInfo{}
+       err = json.Unmarshal(resp.outputBytes, utilisation)
+       assert.NilError(t, err, "getNodeUtilisation should have returned an 
object")
+       assert.Equal(t, utilisation.ResourceType, "first", "expected first as 
type returned")
+       assert.Equal(t, len(utilisation.NodesUtil), 10, "empty usage: 
unexpected bucket count returned")
+       assert.Assert(t, confirmNodeCount(utilisation.NodesUtil, 2), 
"unexpected number of nodes returned should be 2")
+
+       // make second type dominant by using all
+       resAlloc = 
resources.NewResourceFromMap(map[string]resources.Quantity{"second": 5})
+       ask = objects.NewAllocationAsk("alloc-2", "app", resAlloc)
+       alloc = objects.NewAllocation("alloc-2-uuid", node2ID, ask)
+       assert.Assert(t, node2.AddAllocation(alloc), "unexpected failure adding 
allocation to node")
+       err = rootQ.IncAllocatedResource(resAlloc, false)
+       assert.NilError(t, err, "unexpected error returned setting allocated 
resource on queue")
+       // get nodes utilization
+       getNodeUtilisation(resp, req)
+       utilisation = &dao.NodesUtilDAOInfo{}
+       err = json.Unmarshal(resp.outputBytes, utilisation)
+       assert.NilError(t, err, "getNodeUtilisation should have returned an 
object")
+       assert.Equal(t, utilisation.ResourceType, "second", "expected second as 
type returned")
+       assert.Equal(t, len(utilisation.NodesUtil), 10, "empty usage: 
unexpected bucket count returned")
+       assert.Assert(t, confirmNodeCount(utilisation.NodesUtil, 1), 
"unexpected number of nodes returned should be 1")
+}
+
+func confirmNodeCount(info []*dao.NodeUtilDAOInfo, count int64) bool {
+       var total int64
+       for _, node := range info {
+               total += node.NumOfNodes
+       }
+       return total == count
 }
 
 func addAndConfirmApplicationExists(t *testing.T, partitionName string, 
partition *scheduler.PartitionContext, appName string) *objects.Application {
diff --git a/pkg/webservice/routes.go b/pkg/webservice/routes.go
index e4df1c18..78957de0 100644
--- a/pkg/webservice/routes.go
+++ b/pkg/webservice/routes.go
@@ -253,4 +253,10 @@ var webRoutes = routes{
                "/ws/v1/scheduler/healthcheck",
                checkHealthStatus,
        },
+       route{
+               "Scheduler",
+               "GET",
+               "/ws/v1/scheduler/node-utilization",
+               getNodeUtilisation,
+       },
 }
diff --git a/pkg/webservice/webservice.go b/pkg/webservice/webservice.go
index ed5cef87..bf064dff 100644
--- a/pkg/webservice/webservice.go
+++ b/pkg/webservice/webservice.go
@@ -20,6 +20,7 @@ package webservice
 
 import (
        "context"
+       "errors"
        "net/http"
        "time"
 
@@ -60,6 +61,7 @@ func loggingHandler(inner http.Handler, name string) 
http.HandlerFunc {
        }
 }
 
+// StartWebApp starts the web app on the default port.
 // TODO we need the port to be configurable
 func (m *WebService) StartWebApp() {
        router := newRouter()
@@ -68,7 +70,7 @@ func (m *WebService) StartWebApp() {
        log.Log(log.REST).Info("web-app started", zap.Int("port", 9080))
        go func() {
                httpError := m.httpServer.ListenAndServe()
-               if httpError != nil && httpError != http.ErrServerClosed {
+               if httpError != nil && !errors.Is(httpError, 
http.ErrServerClosed) {
                        log.Log(log.REST).Error("HTTP serving error",
                                zap.Error(httpError))
                }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to