This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git
The following commit(s) were added to refs/heads/master by this push:
new d8be4933 [YUNIKORN-2257] Add rest API to retrieve node utilization for
multiple resource types (#762)
d8be4933 is described below
commit d8be49331e3b8f5b221d05e6d216056d05337ce6
Author: Yu-Lin Chen <[email protected]>
AuthorDate: Sun Jan 7 03:00:58 2024 +0800
[YUNIKORN-2257] Add rest API to retrieve node utilization for multiple
resource types (#762)
Closes: #762
Signed-off-by: Chia-Ping Tsai <[email protected]>
---
pkg/webservice/dao/node_util.go | 6 ++
pkg/webservice/handlers.go | 77 ++++++++++++++++++++-
pkg/webservice/handlers_test.go | 148 +++++++++++++++++++++++++++++++++++++---
pkg/webservice/routes.go | 7 ++
4 files changed, 228 insertions(+), 10 deletions(-)
diff --git a/pkg/webservice/dao/node_util.go b/pkg/webservice/dao/node_util.go
index 0cac3c29..46ad7cd5 100644
--- a/pkg/webservice/dao/node_util.go
+++ b/pkg/webservice/dao/node_util.go
@@ -18,6 +18,12 @@
package dao
+type PartitionNodesUtilDAOInfo struct {
+ ClusterId string `json:"clusterId"` // no omitempty,
cluster id should not be empty
+ Partition string `json:"partition"` // no omitempty,
partition should not be empty
+ NodesUtilList []*NodesUtilDAOInfo `json:"utilizations,omitempty"`
+}
+
type NodesUtilDAOInfo struct {
ResourceType string `json:"type,omitempty"`
NodesUtil []*NodeUtilDAOInfo `json:"utilization,omitempty"`
diff --git a/pkg/webservice/handlers.go b/pkg/webservice/handlers.go
index 46bbb760..69c781f0 100644
--- a/pkg/webservice/handlers.go
+++ b/pkg/webservice/handlers.go
@@ -375,7 +375,8 @@ func getNodesDAO(entries []*objects.Node)
[]*dao.NodeDAOInfo {
// getNodeUtilisation loads the node utilisation based on the dominant
resource used
// for the default partition. Dominant resource is defined as the highest
utilised resource
// type on the root queue based on the registered resources.
-// Use the default partition until we get YUNIKORN-2088 fixed
+// Only check the default partition
+// Deprecated - To be removed in next major release. Replaced with
getNodesUtilisations
func getNodeUtilisation(w http.ResponseWriter, r *http.Request) {
writeHeaders(w)
partitionContext :=
schedulerContext.GetPartitionWithoutClusterID(configs.DefaultPartition)
@@ -400,6 +401,7 @@ func getNodeUtilisation(w http.ResponseWriter, r
*http.Request) {
}
// getNodesUtilJSON loads the nodes utilisation for a partition for a specific
resource type.
+// Deprecated - To be removed in next major release. Replaced with
getPartitionNodesUtilJSON
func getNodesUtilJSON(partition *scheduler.PartitionContext, name string)
*dao.NodesUtilDAOInfo {
mapResult := make([]int, 10)
mapName := make([][]string, 10)
@@ -439,6 +441,79 @@ func getNodesUtilJSON(partition
*scheduler.PartitionContext, name string) *dao.N
}
}
+func getNodeUtilisations(w http.ResponseWriter, r *http.Request) {
+ writeHeaders(w)
+ var result []*dao.PartitionNodesUtilDAOInfo
+ for _, part := range schedulerContext.GetPartitionMapClone() {
+ result = append(result, getPartitionNodesUtilJSON(part))
+ }
+
+ if err := json.NewEncoder(w).Encode(result); err != nil {
+ buildJSONErrorResponse(w, err.Error(),
http.StatusInternalServerError)
+ }
+}
+
+// getPartitionNodesUtilJSON retrieves the utilization of all resource types
for nodes within a specific partition.
+func getPartitionNodesUtilJSON(partition *scheduler.PartitionContext)
*dao.PartitionNodesUtilDAOInfo {
+ type UtilizationBucket struct {
+ NodeCount []int // 10 buckets, each bucket contains number
of nodes
+ NodeList [][]string // 10 buckets, each bucket contains node
name list
+ }
+ resourceBuckets := make(map[string]*UtilizationBucket) // key is
resource type, value is UtilizationBucket
+
+ // put nodes to buckets
+ for _, node := range partition.GetNodes() {
+ capacity := node.GetCapacity()
+ resourceAllocated := node.GetAllocatedResource()
+ absUsedCapacity := resources.CalculateAbsUsedCapacity(capacity,
resourceAllocated)
+
+ // append to bucket based on resource type, only count if node
advertises the resource
+ for resourceType := range capacity.Resources {
+ idx := 0
+ if absValue, ok :=
absUsedCapacity.Resources[resourceType]; ok {
+ v := float64(absValue)
+ idx = int(math.Dim(math.Ceil(v/10), 1))
+ }
+
+ // create resource bucket if not exist
+ if _, ok := resourceBuckets[resourceType]; !ok {
+ resourceBuckets[resourceType] =
&UtilizationBucket{
+ NodeCount: make([]int, 10),
+ NodeList: make([][]string, 10),
+ }
+ }
+
+ resourceBuckets[resourceType].NodeCount[idx]++
+ resourceBuckets[resourceType].NodeList[idx] =
append(resourceBuckets[resourceType].NodeList[idx], node.NodeID)
+ }
+ }
+
+ // build result
+ var nodesUtilList []*dao.NodesUtilDAOInfo
+ for resourceType, bucket := range resourceBuckets {
+ var nodesUtil []*dao.NodeUtilDAOInfo
+ for k := 0; k < 10; k++ {
+ util := &dao.NodeUtilDAOInfo{
+ BucketName: fmt.Sprintf("%d", k*10) + "-" +
fmt.Sprintf("%d", (k+1)*10) + "%",
+ NumOfNodes: int64(bucket.NodeCount[k]),
+ NodeNames: bucket.NodeList[k],
+ }
+ nodesUtil = append(nodesUtil, util)
+ }
+ nodeUtilization := &dao.NodesUtilDAOInfo{
+ ResourceType: resourceType,
+ NodesUtil: nodesUtil,
+ }
+ nodesUtilList = append(nodesUtilList, nodeUtilization)
+ }
+
+ return &dao.PartitionNodesUtilDAOInfo{
+ ClusterId: partition.RmID,
+ Partition:
common.GetPartitionNameWithoutClusterID(partition.Name),
+ NodesUtilList: nodesUtilList,
+ }
+}
+
func getApplicationHistory(w http.ResponseWriter, r *http.Request) {
writeHeaders(w)
diff --git a/pkg/webservice/handlers_test.go b/pkg/webservice/handlers_test.go
index 406261c5..9718ed9c 100644
--- a/pkg/webservice/handlers_test.go
+++ b/pkg/webservice/handlers_test.go
@@ -732,17 +732,10 @@ func TestGetNodeUtilisation(t *testing.T) {
assert.Assert(t, confirmNodeCount(utilisation.NodesUtil, 0),
"unexpected number of nodes returned should be 0")
// create test nodes
- nodeRes :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first":
10}).ToProto()
- nodeRes2 :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10,
"second": 5}).ToProto()
node1ID := "node-1"
- node1 := objects.NewNode(&si.NodeInfo{NodeID: node1ID,
SchedulableResource: nodeRes})
node2ID := "node-2"
- node2 := objects.NewNode(&si.NodeInfo{NodeID: node2ID,
SchedulableResource: nodeRes2})
-
- err = partition.AddNode(node1, nil)
- assert.NilError(t, err, "add node to partition should not have failed")
- err = partition.AddNode(node2, nil)
- assert.NilError(t, err, "add node to partition should not have failed")
+ node1 := addNode(t, partition, node1ID,
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}))
+ node2 := addNode(t, partition, node2ID,
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10,
"second": 5}))
// get nodes utilization
getNodeUtilisation(resp, req)
@@ -786,6 +779,22 @@ func TestGetNodeUtilisation(t *testing.T) {
assert.Assert(t, confirmNodeCount(utilisation.NodesUtil, 1),
"unexpected number of nodes returned should be 1")
}
+func addNode(t *testing.T, partition *scheduler.PartitionContext, nodeId
string, resource *resources.Resource) *objects.Node {
+ nodeRes := resource.ToProto()
+ node := objects.NewNode(&si.NodeInfo{NodeID: nodeId,
SchedulableResource: nodeRes})
+ err := partition.AddNode(node, nil)
+ assert.NilError(t, err, "adding node to partition should not fail")
+ return node
+}
+
+func addAllocatedResource(t *testing.T, node *objects.Node, allocationKey
string, appID string, quantityMap map[string]resources.Quantity) {
+ t.Helper()
+ resAlloc := resources.NewResourceFromMap(quantityMap)
+ ask := objects.NewAllocationAsk(allocationKey, appID, resAlloc)
+ alloc := objects.NewAllocation(node.NodeID, ask)
+ assert.Assert(t, node.AddAllocation(alloc), "unexpected failure adding
allocation to node")
+}
+
func confirmNodeCount(info []*dao.NodeUtilDAOInfo, count int64) bool {
var total int64
for _, node := range info {
@@ -803,6 +812,127 @@ func addAndConfirmApplicationExists(t *testing.T,
partitionName string, partitio
return app
}
+func TestGetPartitionNodesUtilJSON(t *testing.T) {
+ // setup
+ partition := setup(t, configDefault, 1)
+ appID := "app1"
+ node1ID := "node-1"
+ node2ID := "node-2"
+ node3ID := "node-3"
+
+ // create test nodes
+ node1 := addNode(t, partition, node1ID,
resources.NewResourceFromMap(map[string]resources.Quantity{siCommon.Memory:
1000, siCommon.CPU: 1000}))
+ node2 := addNode(t, partition, node2ID,
resources.NewResourceFromMap(map[string]resources.Quantity{siCommon.Memory:
1000, siCommon.CPU: 1000, "GPU": 10}))
+ addNode(t, partition, node3ID,
resources.NewResourceFromMap(map[string]resources.Quantity{siCommon.CPU: 1000}))
+
+ // create test allocations
+ addAllocatedResource(t, node1, "alloc-1", appID,
map[string]resources.Quantity{siCommon.Memory: 500, siCommon.CPU: 300})
+ addAllocatedResource(t, node2, "alloc-2", appID,
map[string]resources.Quantity{siCommon.Memory: 300, siCommon.CPU: 500, "GPU":
5})
+
+ // assert partition nodes utilization
+ result := getPartitionNodesUtilJSON(partition)
+ assert.Equal(t, result.ClusterId, rmID)
+ assert.Equal(t, result.Partition, "default")
+ assert.Equal(t, len(result.NodesUtilList), 3, "Should have 3 resource
types(CPU/Memory/GPU) in the list.")
+
+ // two nodes advertise memory: must show up in the list
+ memoryNodesUtil := getNodesUtilByType(t, result.NodesUtilList,
siCommon.Memory)
+ assert.Equal(t, memoryNodesUtil.NodesUtil[2].NumOfNodes, int64(1))
+ assert.Equal(t, memoryNodesUtil.NodesUtil[4].NumOfNodes, int64(1))
+ assert.Equal(t, memoryNodesUtil.NodesUtil[2].NodeNames[0], node2ID)
+ assert.Equal(t, memoryNodesUtil.NodesUtil[4].NodeNames[0], node1ID)
+
+ // three nodes advertise cpu: must show up in the list
+ cpuNodesUtil := getNodesUtilByType(t, result.NodesUtilList,
siCommon.CPU)
+ assert.Equal(t, cpuNodesUtil.NodesUtil[0].NumOfNodes, int64(1))
+ assert.Equal(t, cpuNodesUtil.NodesUtil[0].NodeNames[0], node3ID)
+ assert.Equal(t, cpuNodesUtil.NodesUtil[2].NumOfNodes, int64(1))
+ assert.Equal(t, cpuNodesUtil.NodesUtil[2].NodeNames[0], node1ID)
+ assert.Equal(t, cpuNodesUtil.NodesUtil[4].NumOfNodes, int64(1))
+ assert.Equal(t, cpuNodesUtil.NodesUtil[4].NodeNames[0], node2ID)
+
+ // one node advertise GPU: must show up in the list
+ gpuNodesUtil := getNodesUtilByType(t, result.NodesUtilList, "GPU")
+ assert.Equal(t, gpuNodesUtil.NodesUtil[4].NumOfNodes, int64(1))
+ assert.Equal(t, gpuNodesUtil.NodesUtil[4].NodeNames[0], node2ID)
+}
+
+func TestGetNodeUtilisations(t *testing.T) {
+ // setup
+ NewWebApp(&scheduler.ClusterContext{}, nil)
+ req, err := http.NewRequest("GET",
"/ws/v1/scheduler/node-utilizations", strings.NewReader(""))
+ assert.NilError(t, err, "Get node utilisations Handler request failed")
+ resp := &MockResponseWriter{}
+
+ getNodeUtilisations(resp, req)
+ var partitionNodesUtilDAOInfo []*dao.PartitionNodesUtilDAOInfo
+ err = json.Unmarshal(resp.outputBytes, &partitionNodesUtilDAOInfo)
+ assert.NilError(t, err, "should decode a empty list of
*dao.PartitionNodesUtilDAOInfo")
+ assert.Equal(t, len(partitionNodesUtilDAOInfo), 0)
+
+ // setup partitions
+ schedulerContext, err = scheduler.NewClusterContext(rmID, policyGroup,
[]byte(configMultiPartitions))
+ assert.NilError(t, err, "Error when load clusterInfo from config")
+ schedulerContext.GetPartition("default")
+ defaultPartition :=
schedulerContext.GetPartition(common.GetNormalizedPartitionName("default",
rmID))
+ gpuPartition :=
schedulerContext.GetPartition(common.GetNormalizedPartitionName("gpu", rmID))
+
+ // add nodes to partitions
+ node1 := addNode(t, defaultPartition, "node-1",
resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 10}))
+ node2 := addNode(t, defaultPartition, "node-2",
resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 10,
"vcore": 5}))
+ node3 := addNode(t, defaultPartition, "node-3",
resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 20,
"vcore": 15}))
+ node4 := addNode(t, gpuPartition, "node-4",
resources.NewResourceFromMap(map[string]resources.Quantity{"gpu": 10}))
+ // add allocatedResource to nodes
+ addAllocatedResource(t, node1, "alloc-1", "app-1",
map[string]resources.Quantity{"memory": 1})
+ addAllocatedResource(t, node2, "alloc-1", "app-1",
map[string]resources.Quantity{"memory": 1, "vcore": 1})
+ addAllocatedResource(t, node3, "alloc-1", "app-1",
map[string]resources.Quantity{"memory": 1, "vcore": 1})
+ addAllocatedResource(t, node4, "alloc-1", "app-1",
map[string]resources.Quantity{"gpu": 1})
+
+ // get nodes utilizations
+ getNodeUtilisations(resp, req)
+ err = json.Unmarshal(resp.outputBytes, &partitionNodesUtilDAOInfo)
+ assert.NilError(t, err, "should decode a list of
*dao.PartitionNodesUtilDAOInfo")
+ assert.Equal(t, len(partitionNodesUtilDAOInfo), 2)
+ assert.Equal(t, partitionNodesUtilDAOInfo[0].ClusterId, rmID)
+ assert.Equal(t, partitionNodesUtilDAOInfo[1].ClusterId, rmID)
+
+ defaultPartitionNodesUtilDAOInfo := partitionNodesUtilDAOInfo[0]
+ gpuPartitionNodesUtilDAOInfo := partitionNodesUtilDAOInfo[1]
+ if defaultPartitionNodesUtilDAOInfo.Partition == "gpu" {
+ defaultPartitionNodesUtilDAOInfo = partitionNodesUtilDAOInfo[1]
+ gpuPartitionNodesUtilDAOInfo = partitionNodesUtilDAOInfo[0]
+ }
+
+ assert.Equal(t, len(defaultPartitionNodesUtilDAOInfo.NodesUtilList), 2)
+ assert.Equal(t, len(gpuPartitionNodesUtilDAOInfo.NodesUtilList), 1)
+
+ assertNodeUtilisationContent(t, defaultPartitionNodesUtilDAOInfo,
"memory", 3)
+ assertNodeUtilisationContent(t, defaultPartitionNodesUtilDAOInfo,
"vcore", 2)
+ assertNodeUtilisationContent(t, gpuPartitionNodesUtilDAOInfo, "gpu", 1)
+}
+
+func assertNodeUtilisationContent(t *testing.T, partitionNodesUtilDAOInfo
*dao.PartitionNodesUtilDAOInfo, resourceType string, expectedNodeCount int) {
+ t.Helper()
+ nodeUtilisation := getNodesUtilByType(t,
partitionNodesUtilDAOInfo.NodesUtilList, resourceType)
+ assert.Equal(t, nodeUtilisation.ResourceType, resourceType,
fmt.Sprintf("should have returned '%s', but got '%s'", resourceType,
nodeUtilisation.ResourceType))
+ assert.Equal(t, len(nodeUtilisation.NodesUtil), 10, fmt.Sprintf("should
have 10 bucket, but got %d", len(nodeUtilisation.NodesUtil)))
+ assert.Assert(t,
+ confirmNodeCount(nodeUtilisation.NodesUtil,
int64(expectedNodeCount)),
+ fmt.Sprintf("unexpected number of nodes returned, should be
%d", expectedNodeCount),
+ )
+}
+
+func getNodesUtilByType(t *testing.T, nodesUtilList []*dao.NodesUtilDAOInfo,
resourceType string) *dao.NodesUtilDAOInfo {
+ t.Helper()
+ for _, nodesUtil := range nodesUtilList {
+ if nodesUtil.ResourceType == resourceType {
+ return nodesUtil
+ }
+ }
+ t.Fatalf("should have returned a *dao.NodesUtilDAOInfo with
resourceType %s", resourceType)
+ return nil
+}
+
func TestPartitions(t *testing.T) {
defaultPartition := setup(t, configMultiPartitions, 2)
partitionName := defaultPartition.Name
diff --git a/pkg/webservice/routes.go b/pkg/webservice/routes.go
index 75e2230f..633fa78c 100644
--- a/pkg/webservice/routes.go
+++ b/pkg/webservice/routes.go
@@ -265,10 +265,17 @@ var webRoutes = routes{
"/ws/v1/scheduler/healthcheck",
checkHealthStatus,
},
+ // Deprecated - To be removed in next major release. Replaced with
/ws/v1/scheduler/node-utilizations
route{
"Scheduler",
"GET",
"/ws/v1/scheduler/node-utilization",
getNodeUtilisation,
},
+ route{
+ "Scheduler",
+ "GET",
+ "/ws/v1/scheduler/node-utilizations",
+ getNodeUtilisations,
+ },
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]