This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git


The following commit(s) were added to refs/heads/master by this push:
     new d8be4933 [YUNIKORN-2257] Add rest API to retrieve node utilization for 
multiple resource types (#762)
d8be4933 is described below

commit d8be49331e3b8f5b221d05e6d216056d05337ce6
Author: Yu-Lin Chen <[email protected]>
AuthorDate: Sun Jan 7 03:00:58 2024 +0800

    [YUNIKORN-2257] Add rest API to retrieve node utilization for multiple 
resource types (#762)
    
    Closes: #762
    
    Signed-off-by: Chia-Ping Tsai <[email protected]>
---
 pkg/webservice/dao/node_util.go |   6 ++
 pkg/webservice/handlers.go      |  77 ++++++++++++++++++++-
 pkg/webservice/handlers_test.go | 148 +++++++++++++++++++++++++++++++++++++---
 pkg/webservice/routes.go        |   7 ++
 4 files changed, 228 insertions(+), 10 deletions(-)

diff --git a/pkg/webservice/dao/node_util.go b/pkg/webservice/dao/node_util.go
index 0cac3c29..46ad7cd5 100644
--- a/pkg/webservice/dao/node_util.go
+++ b/pkg/webservice/dao/node_util.go
@@ -18,6 +18,12 @@
 
 package dao
 
+type PartitionNodesUtilDAOInfo struct {
+       ClusterId     string              `json:"clusterId"` // no omitempty, 
cluster id should not be empty
+       Partition     string              `json:"partition"` // no omitempty, 
partition should not be empty
+       NodesUtilList []*NodesUtilDAOInfo `json:"utilizations,omitempty"`
+}
+
 type NodesUtilDAOInfo struct {
        ResourceType string             `json:"type,omitempty"`
        NodesUtil    []*NodeUtilDAOInfo `json:"utilization,omitempty"`
diff --git a/pkg/webservice/handlers.go b/pkg/webservice/handlers.go
index 46bbb760..69c781f0 100644
--- a/pkg/webservice/handlers.go
+++ b/pkg/webservice/handlers.go
@@ -375,7 +375,8 @@ func getNodesDAO(entries []*objects.Node) 
[]*dao.NodeDAOInfo {
 // getNodeUtilisation loads the node utilisation based on the dominant 
resource used
 // for the default partition. Dominant resource is defined as the highest 
utilised resource
 // type on the root queue based on the registered resources.
-// Use the default partition until we get YUNIKORN-2088 fixed
+// Only check the default partition
+// Deprecated - To be removed in next major release. Replaced with 
getNodesUtilisations
 func getNodeUtilisation(w http.ResponseWriter, r *http.Request) {
        writeHeaders(w)
        partitionContext := 
schedulerContext.GetPartitionWithoutClusterID(configs.DefaultPartition)
@@ -400,6 +401,7 @@ func getNodeUtilisation(w http.ResponseWriter, r 
*http.Request) {
 }
 
 // getNodesUtilJSON loads the nodes utilisation for a partition for a specific 
resource type.
+// Deprecated - To be removed in next major release. Replaced with 
getPartitionNodesUtilJSON
 func getNodesUtilJSON(partition *scheduler.PartitionContext, name string) 
*dao.NodesUtilDAOInfo {
        mapResult := make([]int, 10)
        mapName := make([][]string, 10)
@@ -439,6 +441,79 @@ func getNodesUtilJSON(partition 
*scheduler.PartitionContext, name string) *dao.N
        }
 }
 
+func getNodeUtilisations(w http.ResponseWriter, r *http.Request) {
+       writeHeaders(w)
+       var result []*dao.PartitionNodesUtilDAOInfo
+       for _, part := range schedulerContext.GetPartitionMapClone() {
+               result = append(result, getPartitionNodesUtilJSON(part))
+       }
+
+       if err := json.NewEncoder(w).Encode(result); err != nil {
+               buildJSONErrorResponse(w, err.Error(), 
http.StatusInternalServerError)
+       }
+}
+
+// getPartitionNodesUtilJSON retrieves the utilization of all resource types 
for nodes within a specific partition.
+func getPartitionNodesUtilJSON(partition *scheduler.PartitionContext) 
*dao.PartitionNodesUtilDAOInfo {
+       type UtilizationBucket struct {
+               NodeCount []int      // 10 buckets, each bucket contains number 
of nodes
+               NodeList  [][]string // 10 buckets, each bucket contains node 
name list
+       }
+       resourceBuckets := make(map[string]*UtilizationBucket) // key is 
resource type, value is UtilizationBucket
+
+       // put nodes to buckets
+       for _, node := range partition.GetNodes() {
+               capacity := node.GetCapacity()
+               resourceAllocated := node.GetAllocatedResource()
+               absUsedCapacity := resources.CalculateAbsUsedCapacity(capacity, 
resourceAllocated)
+
+               // append to bucket based on resource type, only count if node 
advertises the resource
+               for resourceType := range capacity.Resources {
+                       idx := 0
+                       if absValue, ok := 
absUsedCapacity.Resources[resourceType]; ok {
+                               v := float64(absValue)
+                               idx = int(math.Dim(math.Ceil(v/10), 1))
+                       }
+
+                       // create resource bucket if not exist
+                       if _, ok := resourceBuckets[resourceType]; !ok {
+                               resourceBuckets[resourceType] = 
&UtilizationBucket{
+                                       NodeCount: make([]int, 10),
+                                       NodeList:  make([][]string, 10),
+                               }
+                       }
+
+                       resourceBuckets[resourceType].NodeCount[idx]++
+                       resourceBuckets[resourceType].NodeList[idx] = 
append(resourceBuckets[resourceType].NodeList[idx], node.NodeID)
+               }
+       }
+
+       // build result
+       var nodesUtilList []*dao.NodesUtilDAOInfo
+       for resourceType, bucket := range resourceBuckets {
+               var nodesUtil []*dao.NodeUtilDAOInfo
+               for k := 0; k < 10; k++ {
+                       util := &dao.NodeUtilDAOInfo{
+                               BucketName: fmt.Sprintf("%d", k*10) + "-" + 
fmt.Sprintf("%d", (k+1)*10) + "%",
+                               NumOfNodes: int64(bucket.NodeCount[k]),
+                               NodeNames:  bucket.NodeList[k],
+                       }
+                       nodesUtil = append(nodesUtil, util)
+               }
+               nodeUtilization := &dao.NodesUtilDAOInfo{
+                       ResourceType: resourceType,
+                       NodesUtil:    nodesUtil,
+               }
+               nodesUtilList = append(nodesUtilList, nodeUtilization)
+       }
+
+       return &dao.PartitionNodesUtilDAOInfo{
+               ClusterId:     partition.RmID,
+               Partition:     
common.GetPartitionNameWithoutClusterID(partition.Name),
+               NodesUtilList: nodesUtilList,
+       }
+}
+
 func getApplicationHistory(w http.ResponseWriter, r *http.Request) {
        writeHeaders(w)
 
diff --git a/pkg/webservice/handlers_test.go b/pkg/webservice/handlers_test.go
index 406261c5..9718ed9c 100644
--- a/pkg/webservice/handlers_test.go
+++ b/pkg/webservice/handlers_test.go
@@ -732,17 +732,10 @@ func TestGetNodeUtilisation(t *testing.T) {
        assert.Assert(t, confirmNodeCount(utilisation.NodesUtil, 0), 
"unexpected number of nodes returned should be 0")
 
        // create test nodes
-       nodeRes := 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 
10}).ToProto()
-       nodeRes2 := 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10, 
"second": 5}).ToProto()
        node1ID := "node-1"
-       node1 := objects.NewNode(&si.NodeInfo{NodeID: node1ID, 
SchedulableResource: nodeRes})
        node2ID := "node-2"
-       node2 := objects.NewNode(&si.NodeInfo{NodeID: node2ID, 
SchedulableResource: nodeRes2})
-
-       err = partition.AddNode(node1, nil)
-       assert.NilError(t, err, "add node to partition should not have failed")
-       err = partition.AddNode(node2, nil)
-       assert.NilError(t, err, "add node to partition should not have failed")
+       node1 := addNode(t, partition, node1ID, 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}))
+       node2 := addNode(t, partition, node2ID, 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10, 
"second": 5}))
 
        // get nodes utilization
        getNodeUtilisation(resp, req)
@@ -786,6 +779,22 @@ func TestGetNodeUtilisation(t *testing.T) {
        assert.Assert(t, confirmNodeCount(utilisation.NodesUtil, 1), 
"unexpected number of nodes returned should be 1")
 }
 
+func addNode(t *testing.T, partition *scheduler.PartitionContext, nodeId 
string, resource *resources.Resource) *objects.Node {
+       nodeRes := resource.ToProto()
+       node := objects.NewNode(&si.NodeInfo{NodeID: nodeId, 
SchedulableResource: nodeRes})
+       err := partition.AddNode(node, nil)
+       assert.NilError(t, err, "adding node to partition should not fail")
+       return node
+}
+
+func addAllocatedResource(t *testing.T, node *objects.Node, allocationKey 
string, appID string, quantityMap map[string]resources.Quantity) {
+       t.Helper()
+       resAlloc := resources.NewResourceFromMap(quantityMap)
+       ask := objects.NewAllocationAsk(allocationKey, appID, resAlloc)
+       alloc := objects.NewAllocation(node.NodeID, ask)
+       assert.Assert(t, node.AddAllocation(alloc), "unexpected failure adding 
allocation to node")
+}
+
 func confirmNodeCount(info []*dao.NodeUtilDAOInfo, count int64) bool {
        var total int64
        for _, node := range info {
@@ -803,6 +812,127 @@ func addAndConfirmApplicationExists(t *testing.T, 
partitionName string, partitio
        return app
 }
 
+func TestGetPartitionNodesUtilJSON(t *testing.T) {
+       // setup
+       partition := setup(t, configDefault, 1)
+       appID := "app1"
+       node1ID := "node-1"
+       node2ID := "node-2"
+       node3ID := "node-3"
+
+       // create test nodes
+       node1 := addNode(t, partition, node1ID, 
resources.NewResourceFromMap(map[string]resources.Quantity{siCommon.Memory: 
1000, siCommon.CPU: 1000}))
+       node2 := addNode(t, partition, node2ID, 
resources.NewResourceFromMap(map[string]resources.Quantity{siCommon.Memory: 
1000, siCommon.CPU: 1000, "GPU": 10}))
+       addNode(t, partition, node3ID, 
resources.NewResourceFromMap(map[string]resources.Quantity{siCommon.CPU: 1000}))
+
+       // create test allocations
+       addAllocatedResource(t, node1, "alloc-1", appID, 
map[string]resources.Quantity{siCommon.Memory: 500, siCommon.CPU: 300})
+       addAllocatedResource(t, node2, "alloc-2", appID, 
map[string]resources.Quantity{siCommon.Memory: 300, siCommon.CPU: 500, "GPU": 
5})
+
+       // assert partition nodes utilization
+       result := getPartitionNodesUtilJSON(partition)
+       assert.Equal(t, result.ClusterId, rmID)
+       assert.Equal(t, result.Partition, "default")
+       assert.Equal(t, len(result.NodesUtilList), 3, "Should have 3 resource 
types(CPU/Memory/GPU) in the list.")
+
+       // two nodes advertise memory: must show up in the list
+       memoryNodesUtil := getNodesUtilByType(t, result.NodesUtilList, 
siCommon.Memory)
+       assert.Equal(t, memoryNodesUtil.NodesUtil[2].NumOfNodes, int64(1))
+       assert.Equal(t, memoryNodesUtil.NodesUtil[4].NumOfNodes, int64(1))
+       assert.Equal(t, memoryNodesUtil.NodesUtil[2].NodeNames[0], node2ID)
+       assert.Equal(t, memoryNodesUtil.NodesUtil[4].NodeNames[0], node1ID)
+
+       // three nodes advertise cpu: must show up in the list
+       cpuNodesUtil := getNodesUtilByType(t, result.NodesUtilList, 
siCommon.CPU)
+       assert.Equal(t, cpuNodesUtil.NodesUtil[0].NumOfNodes, int64(1))
+       assert.Equal(t, cpuNodesUtil.NodesUtil[0].NodeNames[0], node3ID)
+       assert.Equal(t, cpuNodesUtil.NodesUtil[2].NumOfNodes, int64(1))
+       assert.Equal(t, cpuNodesUtil.NodesUtil[2].NodeNames[0], node1ID)
+       assert.Equal(t, cpuNodesUtil.NodesUtil[4].NumOfNodes, int64(1))
+       assert.Equal(t, cpuNodesUtil.NodesUtil[4].NodeNames[0], node2ID)
+
+       // one node advertise GPU: must show up in the list
+       gpuNodesUtil := getNodesUtilByType(t, result.NodesUtilList, "GPU")
+       assert.Equal(t, gpuNodesUtil.NodesUtil[4].NumOfNodes, int64(1))
+       assert.Equal(t, gpuNodesUtil.NodesUtil[4].NodeNames[0], node2ID)
+}
+
+func TestGetNodeUtilisations(t *testing.T) {
+       // setup
+       NewWebApp(&scheduler.ClusterContext{}, nil)
+       req, err := http.NewRequest("GET", 
"/ws/v1/scheduler/node-utilizations", strings.NewReader(""))
+       assert.NilError(t, err, "Get node utilisations Handler request failed")
+       resp := &MockResponseWriter{}
+
+       getNodeUtilisations(resp, req)
+       var partitionNodesUtilDAOInfo []*dao.PartitionNodesUtilDAOInfo
+       err = json.Unmarshal(resp.outputBytes, &partitionNodesUtilDAOInfo)
+       assert.NilError(t, err, "should decode a empty list of 
*dao.PartitionNodesUtilDAOInfo")
+       assert.Equal(t, len(partitionNodesUtilDAOInfo), 0)
+
+       // setup partitions
+       schedulerContext, err = scheduler.NewClusterContext(rmID, policyGroup, 
[]byte(configMultiPartitions))
+       assert.NilError(t, err, "Error when load clusterInfo from config")
+       schedulerContext.GetPartition("default")
+       defaultPartition := 
schedulerContext.GetPartition(common.GetNormalizedPartitionName("default", 
rmID))
+       gpuPartition := 
schedulerContext.GetPartition(common.GetNormalizedPartitionName("gpu", rmID))
+
+       // add nodes to partitions
+       node1 := addNode(t, defaultPartition, "node-1", 
resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 10}))
+       node2 := addNode(t, defaultPartition, "node-2", 
resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 10, 
"vcore": 5}))
+       node3 := addNode(t, defaultPartition, "node-3", 
resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 20, 
"vcore": 15}))
+       node4 := addNode(t, gpuPartition, "node-4", 
resources.NewResourceFromMap(map[string]resources.Quantity{"gpu": 10}))
+       // add allocatedResource to nodes
+       addAllocatedResource(t, node1, "alloc-1", "app-1", 
map[string]resources.Quantity{"memory": 1})
+       addAllocatedResource(t, node2, "alloc-1", "app-1", 
map[string]resources.Quantity{"memory": 1, "vcore": 1})
+       addAllocatedResource(t, node3, "alloc-1", "app-1", 
map[string]resources.Quantity{"memory": 1, "vcore": 1})
+       addAllocatedResource(t, node4, "alloc-1", "app-1", 
map[string]resources.Quantity{"gpu": 1})
+
+       // get nodes utilizations
+       getNodeUtilisations(resp, req)
+       err = json.Unmarshal(resp.outputBytes, &partitionNodesUtilDAOInfo)
+       assert.NilError(t, err, "should decode a list of 
*dao.PartitionNodesUtilDAOInfo")
+       assert.Equal(t, len(partitionNodesUtilDAOInfo), 2)
+       assert.Equal(t, partitionNodesUtilDAOInfo[0].ClusterId, rmID)
+       assert.Equal(t, partitionNodesUtilDAOInfo[1].ClusterId, rmID)
+
+       defaultPartitionNodesUtilDAOInfo := partitionNodesUtilDAOInfo[0]
+       gpuPartitionNodesUtilDAOInfo := partitionNodesUtilDAOInfo[1]
+       if defaultPartitionNodesUtilDAOInfo.Partition == "gpu" {
+               defaultPartitionNodesUtilDAOInfo = partitionNodesUtilDAOInfo[1]
+               gpuPartitionNodesUtilDAOInfo = partitionNodesUtilDAOInfo[0]
+       }
+
+       assert.Equal(t, len(defaultPartitionNodesUtilDAOInfo.NodesUtilList), 2)
+       assert.Equal(t, len(gpuPartitionNodesUtilDAOInfo.NodesUtilList), 1)
+
+       assertNodeUtilisationContent(t, defaultPartitionNodesUtilDAOInfo, 
"memory", 3)
+       assertNodeUtilisationContent(t, defaultPartitionNodesUtilDAOInfo, 
"vcore", 2)
+       assertNodeUtilisationContent(t, gpuPartitionNodesUtilDAOInfo, "gpu", 1)
+}
+
+func assertNodeUtilisationContent(t *testing.T, partitionNodesUtilDAOInfo 
*dao.PartitionNodesUtilDAOInfo, resourceType string, expectedNodeCount int) {
+       t.Helper()
+       nodeUtilisation := getNodesUtilByType(t, 
partitionNodesUtilDAOInfo.NodesUtilList, resourceType)
+       assert.Equal(t, nodeUtilisation.ResourceType, resourceType, 
fmt.Sprintf("should have returned '%s', but got '%s'", resourceType, 
nodeUtilisation.ResourceType))
+       assert.Equal(t, len(nodeUtilisation.NodesUtil), 10, fmt.Sprintf("should 
have 10 bucket, but got %d", len(nodeUtilisation.NodesUtil)))
+       assert.Assert(t,
+               confirmNodeCount(nodeUtilisation.NodesUtil, 
int64(expectedNodeCount)),
+               fmt.Sprintf("unexpected number of nodes returned, should be 
%d", expectedNodeCount),
+       )
+}
+
+func getNodesUtilByType(t *testing.T, nodesUtilList []*dao.NodesUtilDAOInfo, 
resourceType string) *dao.NodesUtilDAOInfo {
+       t.Helper()
+       for _, nodesUtil := range nodesUtilList {
+               if nodesUtil.ResourceType == resourceType {
+                       return nodesUtil
+               }
+       }
+       t.Fatalf("should have returned a *dao.NodesUtilDAOInfo with 
resourceType %s", resourceType)
+       return nil
+}
+
 func TestPartitions(t *testing.T) {
        defaultPartition := setup(t, configMultiPartitions, 2)
        partitionName := defaultPartition.Name
diff --git a/pkg/webservice/routes.go b/pkg/webservice/routes.go
index 75e2230f..633fa78c 100644
--- a/pkg/webservice/routes.go
+++ b/pkg/webservice/routes.go
@@ -265,10 +265,17 @@ var webRoutes = routes{
                "/ws/v1/scheduler/healthcheck",
                checkHealthStatus,
        },
+       // Deprecated - To be removed in next major release. Replaced with 
/ws/v1/scheduler/node-utilizations
        route{
                "Scheduler",
                "GET",
                "/ws/v1/scheduler/node-utilization",
                getNodeUtilisation,
        },
+       route{
+               "Scheduler",
+               "GET",
+               "/ws/v1/scheduler/node-utilizations",
+               getNodeUtilisations,
+       },
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to