This is an automated email from the ASF dual-hosted git repository.
wilfreds pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git
The following commit(s) were added to refs/heads/master by this push:
new 0fd74361 [YUNIKORN-2087] reinstate node utilisation rest endpoint
(#691)
0fd74361 is described below
commit 0fd743614b973650fc3fa24488be240e2e0b3121
Author: Wilfred Spiegelenburg <[email protected]>
AuthorDate: Thu Nov 9 13:58:54 2023 +1100
[YUNIKORN-2087] reinstate node utilisation rest endpoint (#691)
The node utilisation endpoint is required to be exposed for the
dashboard to work after YUNIKORN-325
Moving the endpoint to /ws/v1/scheduler/node-utilization
The endpoint will need to change as part of YUNIKORN-2088
Dominant resource is calculated for the root queue and that utilisation
is shown for the nodes. The dominant reource type is the type with the
highest usage ratio. A pre defined ratio for zero usage and capacity is
defined as: 0 capacity, 0 usage: 0; 0 capacity, >0 usage: 1 otherwise
ratio = usage / capacity.
Closes: #691
Signed-off-by: Wilfred Spiegelenburg <[email protected]>
---
pkg/common/resources/resources.go | 41 +++++++++
pkg/common/resources/resources_test.go | 26 ++++++
pkg/webservice/handlers.go | 76 ++++++++++++-----
pkg/webservice/handlers_test.go | 151 ++++++++++++++++++++++++++-------
pkg/webservice/routes.go | 6 ++
pkg/webservice/webservice.go | 4 +-
6 files changed, 251 insertions(+), 53 deletions(-)
diff --git a/pkg/common/resources/resources.go
b/pkg/common/resources/resources.go
index 2aee8c04..bf96e98f 100644
--- a/pkg/common/resources/resources.go
+++ b/pkg/common/resources/resources.go
@@ -993,3 +993,44 @@ func CalculateAbsUsedCapacity(capacity, used *Resource)
*Resource {
}
return absResource
}
+
+// DominantResourceType calculates the most used resource type based on the
ratio of used compared to
+// the capacity. If a capacity type is set to 0 assume full usage.
+// Dominant type should be calculated with queue usage and capacity. Queue
capacities should never
+// contain 0 values when there is a usage also, however in the root queue this
could happen. If the
+// last node reporting that resource was removed but not everything has been
updated.
+// immediately
+// Ignores resources types that are used but not defined in the capacity.
+func (r *Resource) DominantResourceType(capacity *Resource) string {
+ if r == nil || capacity == nil {
+ return ""
+ }
+ var div, temp float64
+ dominant := ""
+ for name, usedVal := range r.Resources {
+ capVal, ok := capacity.Resources[name]
+ if !ok {
+ log.Log(log.Resources).Debug("missing resource in
dominant calculation",
+ zap.String("missing resource", name))
+ continue
+ }
+ // calculate the ratio between usage and capacity
+ // ratio should be somewhere between 0 and 1, but do not
restrict
+ // handle 0 values specifically just to be safe should never
happen
+ if capVal == 0 {
+ if usedVal == 0 {
+ temp = 0 // no usage, no cap: consider empty
+ } else {
+ temp = 1 // usage, no cap: fully used
+ }
+ } else {
+ temp = float64(usedVal) / float64(capVal) // both not
zero calculate ratio
+ }
+ // if we have exactly the same use the latest one
+ if temp >= div {
+ div = temp
+ dominant = name
+ }
+ }
+ return dominant
+}
diff --git a/pkg/common/resources/resources_test.go
b/pkg/common/resources/resources_test.go
index 8460017c..5c32095c 100644
--- a/pkg/common/resources/resources_test.go
+++ b/pkg/common/resources/resources_test.go
@@ -1701,3 +1701,29 @@ func TestIsEmpty(t *testing.T) {
})
}
}
+
+func TestResource_DominantResource(t *testing.T) {
+ tests := []struct {
+ name string
+ used *Resource
+ capacity *Resource
+ wantName string
+ }{
+ {"nil receiver", nil, Zero, ""},
+ {"nil cap", Zero, nil, ""},
+ {"zero cap", NewResourceFromMap(map[string]Quantity{"A": 10}),
Zero, ""},
+ {"over cap", NewResourceFromMap(map[string]Quantity{"A": 20}),
NewResourceFromMap(map[string]Quantity{"A": 10}), "A"},
+ {"zero usage exist",
NewResourceFromMap(map[string]Quantity{"A": 0}),
NewResourceFromMap(map[string]Quantity{"A": 10}), "A"},
+ {"usage not in cap",
NewResourceFromMap(map[string]Quantity{"B": 10}),
NewResourceFromMap(map[string]Quantity{"A": 10}), ""},
+ {"multiple usages", NewResourceFromMap(map[string]Quantity{"B":
10, "A": 10}), NewResourceFromMap(map[string]Quantity{"A": 10, "B": 20}), "A"},
+ {"0 usage with 0 cap",
NewResourceFromMap(map[string]Quantity{"A": 0}),
NewResourceFromMap(map[string]Quantity{"A": 0}), "A"},
+ {"usage with 0 cap",
NewResourceFromMap(map[string]Quantity{"A": 10, "B": 5}),
NewResourceFromMap(map[string]Quantity{"A": 0, "B": 10}), "A"},
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ if got := tt.used.DominantResourceType(tt.capacity);
got != tt.wantName {
+ t.Errorf("DominantResourceType() = %v, want
%v", got, tt.wantName)
+ }
+ })
+ }
+}
diff --git a/pkg/webservice/handlers.go b/pkg/webservice/handlers.go
index 4192093b..92cc472e 100644
--- a/pkg/webservice/handlers.go
+++ b/pkg/webservice/handlers.go
@@ -48,15 +48,17 @@ import (
"github.com/apache/yunikorn-core/pkg/webservice/dao"
)
-const PartitionDoesNotExists = "Partition not found"
-const MissingParamsName = "Missing parameters"
-const QueueDoesNotExists = "Queue not found"
-const UserDoesNotExists = "User not found"
-const GroupDoesNotExists = "Group not found"
-const UserNameMissing = "User name is missing"
-const GroupNameMissing = "Group name is missing"
-const ApplicationDoesNotExists = "Application not found"
-const NodeDoesNotExists = "Node not found"
+const (
+ PartitionDoesNotExists = "Partition not found"
+ MissingParamsName = "Missing parameters"
+ QueueDoesNotExists = "Queue not found"
+ UserDoesNotExists = "User not found"
+ GroupDoesNotExists = "Group not found"
+ UserNameMissing = "User name is missing"
+ GroupNameMissing = "Group name is missing"
+ ApplicationDoesNotExists = "Application not found"
+ NodeDoesNotExists = "Node not found"
+)
func getStackInfo(w http.ResponseWriter, r *http.Request) {
writeHeaders(w)
@@ -146,7 +148,7 @@ func getClusterJSON(partition *scheduler.PartitionContext)
*dao.ClusterDAOInfo {
func getClusterUtilJSON(partition *scheduler.PartitionContext)
[]*dao.ClusterUtilDAOInfo {
var utils []*dao.ClusterUtilDAOInfo
- var getResource bool = true
+ var getResource = true
total := partition.GetTotalPartitionResource()
if resources.IsZero(total) {
getResource = false
@@ -162,7 +164,7 @@ func getClusterUtilJSON(partition
*scheduler.PartitionContext) []*dao.ClusterUti
ResourceType: name,
Total: int64(total.Resources[name]),
Used: int64(used.Resources[name]),
- Usage: fmt.Sprintf("%d", int64(value)) +
"%",
+ Usage: fmt.Sprintf("%d%%", int64(value)),
}
utils = append(utils, utilization)
}
@@ -348,29 +350,57 @@ func getNodesDAO(entries []*objects.Node)
[]*dao.NodeDAOInfo {
return nodesDAO
}
+// getNodeUtilisation loads the node utilisation based on the dominant
resource used
+// for the default partition. Dominant resource is defined as the highest
utilised resource
+// type on the root queue based on the registered resources.
+// Use the default partition until we get YUNIKORN-2088 fixed
+func getNodeUtilisation(w http.ResponseWriter, r *http.Request) {
+ writeHeaders(w)
+ partitionContext :=
schedulerContext.GetPartitionWithoutClusterID(configs.DefaultPartition)
+ if partitionContext == nil {
+ buildJSONErrorResponse(w, PartitionDoesNotExists,
http.StatusBadRequest)
+ return
+ }
+ // calculate the dominant resource based on root queue usage and size
+ rootQ := partitionContext.GetQueue(configs.RootQueue)
+ rootMax := rootQ.GetMaxResource()
+ // if no nodes have been registered return an empty object
+ nodesDao := &dao.NodesUtilDAOInfo{}
+ if !resources.IsZero(rootMax) {
+ // if nothing is used we get an empty dominant resource and
return an empty object
+ rootUsed := rootQ.GetAllocatedResource()
+ dominant := rootUsed.DominantResourceType(rootMax)
+ nodesDao = getNodesUtilJSON(partitionContext, dominant)
+ }
+ if err := json.NewEncoder(w).Encode(nodesDao); err != nil {
+ buildJSONErrorResponse(w, err.Error(),
http.StatusInternalServerError)
+ }
+}
+
+// getNodesUtilJSON loads the nodes utilisation for a partition for a specific
resource type.
func getNodesUtilJSON(partition *scheduler.PartitionContext, name string)
*dao.NodesUtilDAOInfo {
mapResult := make([]int, 10)
mapName := make([][]string, 10)
var v float64
var nodeUtil []*dao.NodeUtilDAOInfo
+ var idx int
for _, node := range partition.GetNodes() {
- resourceExist := true
- // check resource exist or not
+ // check resource exist or not: only count if node advertises
the resource
total := node.GetCapacity()
- if total.Resources[name] <= 0 {
- resourceExist = false
+ if _, ok := total.Resources[name]; !ok {
+ continue
}
resourceAllocated := node.GetAllocatedResource()
- if _, ok := resourceAllocated.Resources[name]; !ok {
- resourceExist = false
- }
- // if resource exist in node, record the bucket it should go
- if resourceExist {
+ // if resource exist in node, record the bucket it should go
into,
+ // otherwise none is used, and it should end up in the 0 bucket
+ if _, ok := resourceAllocated.Resources[name]; ok {
v = float64(resources.CalculateAbsUsedCapacity(total,
resourceAllocated).Resources[name])
- idx := int(math.Dim(math.Ceil(v/10), 1))
- mapResult[idx]++
- mapName[idx] = append(mapName[idx], node.NodeID)
+ idx = int(math.Dim(math.Ceil(v/10), 1))
+ } else {
+ idx = 0
}
+ mapResult[idx]++
+ mapName[idx] = append(mapName[idx], node.NodeID)
}
// put number of nodes and node name to different buckets
for k := 0; k < 10; k++ {
diff --git a/pkg/webservice/handlers_test.go b/pkg/webservice/handlers_test.go
index 136f6b1f..b88117ff 100644
--- a/pkg/webservice/handlers_test.go
+++ b/pkg/webservice/handlers_test.go
@@ -661,11 +661,14 @@ func TestGetNodesUtilJSON(t *testing.T) {
// create test nodes
nodeRes :=
resources.NewResourceFromMap(map[string]resources.Quantity{siCommon.Memory:
1000, siCommon.CPU: 1000}).ToProto()
- nodeRes2 :=
resources.NewResourceFromMap(map[string]resources.Quantity{siCommon.Memory:
1000, siCommon.CPU: 1000, "GPU": 10}).ToProto()
node1ID := "node-1"
node1 := objects.NewNode(&si.NodeInfo{NodeID: node1ID,
SchedulableResource: nodeRes})
node2ID := "node-2"
+ nodeRes2 :=
resources.NewResourceFromMap(map[string]resources.Quantity{siCommon.Memory:
1000, siCommon.CPU: 1000, "GPU": 10}).ToProto()
node2 := objects.NewNode(&si.NodeInfo{NodeID: node2ID,
SchedulableResource: nodeRes2})
+ node3ID := "node-3"
+ nodeCPU :=
resources.NewResourceFromMap(map[string]resources.Quantity{siCommon.CPU:
1000}).ToProto()
+ node3 := objects.NewNode(&si.NodeInfo{NodeID: node3ID,
SchedulableResource: nodeCPU})
// create test allocations
resAlloc1 :=
resources.NewResourceFromMap(map[string]resources.Quantity{siCommon.Memory:
500, siCommon.CPU: 300})
@@ -678,36 +681,126 @@ func TestGetNodesUtilJSON(t *testing.T) {
allocs = []*objects.Allocation{objects.NewAllocation("alloc-2-uuid",
node2ID, ask2)}
err = partition.AddNode(node2, allocs)
assert.NilError(t, err, "add node to partition should not have failed")
+ err = partition.AddNode(node3, nil)
+ assert.NilError(t, err, "add node to partition should not have failed")
+
+ // two nodes advertise memory: must show up in the list
+ result := getNodesUtilJSON(partition, siCommon.Memory)
+ subResult := result.NodesUtil
+ assert.Equal(t, result.ResourceType, siCommon.Memory)
+ assert.Equal(t, subResult[2].NumOfNodes, int64(1))
+ assert.Equal(t, subResult[4].NumOfNodes, int64(1))
+ assert.Equal(t, subResult[2].NodeNames[0], node2ID)
+ assert.Equal(t, subResult[4].NodeNames[0], node1ID)
+
+ // three nodes advertise cpu: must show up in the list
+ result = getNodesUtilJSON(partition, siCommon.CPU)
+ subResult = result.NodesUtil
+ assert.Equal(t, result.ResourceType, siCommon.CPU)
+ assert.Equal(t, subResult[0].NumOfNodes, int64(1))
+ assert.Equal(t, subResult[0].NodeNames[0], node3ID)
+ assert.Equal(t, subResult[2].NumOfNodes, int64(1))
+ assert.Equal(t, subResult[2].NodeNames[0], node1ID)
+ assert.Equal(t, subResult[4].NumOfNodes, int64(1))
+ assert.Equal(t, subResult[4].NodeNames[0], node2ID)
+
+ // one node advertise GPU: must show up in the list
+ result = getNodesUtilJSON(partition, "GPU")
+ subResult = result.NodesUtil
+ assert.Equal(t, result.ResourceType, "GPU")
+ assert.Equal(t, subResult[4].NumOfNodes, int64(1))
+ assert.Equal(t, subResult[4].NodeNames[0], node2ID)
+
+ result = getNodesUtilJSON(partition, "non-exist")
+ subResult = result.NodesUtil
+ assert.Equal(t, result.ResourceType, "non-exist")
+ assert.Equal(t, subResult[0].NumOfNodes, int64(0))
+ assert.Equal(t, len(subResult[0].NodeNames), 0)
+}
+
+func TestGetNodeUtilisation(t *testing.T) {
+ NewWebApp(&scheduler.ClusterContext{}, nil)
+
+ // var req *http.Request
+ req, err := http.NewRequest("GET", "/ws/v1/scheduler/node-utilization",
strings.NewReader(""))
+ assert.NilError(t, err, "Get node utilisation Handler request failed")
+ req = req.WithContext(context.TODO())
+ resp := &MockResponseWriter{}
+
+ getNodeUtilisation(resp, req)
+ var errInfo dao.YAPIError
+ err = json.Unmarshal(resp.outputBytes, &errInfo)
+ assert.NilError(t, err, "getNodeUtilisation should have returned and
error")
+
+ partition := setup(t, configDefault, 1)
+ utilisation := &dao.NodesUtilDAOInfo{}
+ err = json.Unmarshal(resp.outputBytes, utilisation)
+ assert.NilError(t, err, "getNodeUtilisation should have returned an
empty object")
+ assert.Equal(t, utilisation.ResourceType, "", "unexpected type
returned")
+ assert.Equal(t, len(utilisation.NodesUtil), 0, "no nodes should be
returned")
+ assert.Assert(t, confirmNodeCount(utilisation.NodesUtil, 0),
"unexpected number of nodes returned should be 0")
+
+ // create test nodes
+ nodeRes :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first":
10}).ToProto()
+ nodeRes2 :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10,
"second": 5}).ToProto()
+ node1ID := "node-1"
+ node1 := objects.NewNode(&si.NodeInfo{NodeID: node1ID,
SchedulableResource: nodeRes})
+ node2ID := "node-2"
+ node2 := objects.NewNode(&si.NodeInfo{NodeID: node2ID,
SchedulableResource: nodeRes2})
+
+ err = partition.AddNode(node1, nil)
+ assert.NilError(t, err, "add node to partition should not have failed")
+ err = partition.AddNode(node2, nil)
+ assert.NilError(t, err, "add node to partition should not have failed")
// get nodes utilization
- res1 := getNodesUtilJSON(partition, siCommon.Memory)
- res2 := getNodesUtilJSON(partition, siCommon.CPU)
- res3 := getNodesUtilJSON(partition, "GPU")
- resNon := getNodesUtilJSON(partition, "non-exist")
- subres1 := res1.NodesUtil
- subres2 := res2.NodesUtil
- subres3 := res3.NodesUtil
- subresNon := resNon.NodesUtil
-
- assert.Equal(t, res1.ResourceType, siCommon.Memory)
- assert.Equal(t, subres1[2].NumOfNodes, int64(1))
- assert.Equal(t, subres1[4].NumOfNodes, int64(1))
- assert.Equal(t, subres1[2].NodeNames[0], node2ID)
- assert.Equal(t, subres1[4].NodeNames[0], node1ID)
-
- assert.Equal(t, res2.ResourceType, siCommon.CPU)
- assert.Equal(t, subres2[2].NumOfNodes, int64(1))
- assert.Equal(t, subres2[4].NumOfNodes, int64(1))
- assert.Equal(t, subres2[2].NodeNames[0], node1ID)
- assert.Equal(t, subres2[4].NodeNames[0], node2ID)
-
- assert.Equal(t, res3.ResourceType, "GPU")
- assert.Equal(t, subres3[4].NumOfNodes, int64(1))
- assert.Equal(t, subres3[4].NodeNames[0], node2ID)
-
- assert.Equal(t, resNon.ResourceType, "non-exist")
- assert.Equal(t, subresNon[0].NumOfNodes, int64(0))
- assert.Equal(t, len(subresNon[0].NodeNames), 0)
+ getNodeUtilisation(resp, req)
+ utilisation = &dao.NodesUtilDAOInfo{}
+ err = json.Unmarshal(resp.outputBytes, utilisation)
+ assert.NilError(t, err, "getNodeUtilisation should have returned an
object")
+ assert.Equal(t, utilisation.ResourceType, "", "unexpected type
returned")
+ assert.Equal(t, len(utilisation.NodesUtil), 10, "empty usage:
unexpected bucket count returned")
+ assert.Assert(t, confirmNodeCount(utilisation.NodesUtil, 0),
"unexpected number of nodes returned should be 0")
+
+ resAlloc :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10})
+ ask := objects.NewAllocationAsk("alloc-1", "app", resAlloc)
+ alloc := objects.NewAllocation("alloc-1-uuid", node1ID, ask)
+ assert.Assert(t, node1.AddAllocation(alloc), "unexpected failure adding
allocation to node")
+ rootQ := partition.GetQueue("root")
+ err = rootQ.IncAllocatedResource(resAlloc, false)
+ assert.NilError(t, err, "unexpected error returned setting allocated
resource on queue")
+ // get nodes utilization
+ getNodeUtilisation(resp, req)
+ utilisation = &dao.NodesUtilDAOInfo{}
+ err = json.Unmarshal(resp.outputBytes, utilisation)
+ assert.NilError(t, err, "getNodeUtilisation should have returned an
object")
+ assert.Equal(t, utilisation.ResourceType, "first", "expected first as
type returned")
+ assert.Equal(t, len(utilisation.NodesUtil), 10, "empty usage:
unexpected bucket count returned")
+ assert.Assert(t, confirmNodeCount(utilisation.NodesUtil, 2),
"unexpected number of nodes returned should be 2")
+
+ // make second type dominant by using all
+ resAlloc =
resources.NewResourceFromMap(map[string]resources.Quantity{"second": 5})
+ ask = objects.NewAllocationAsk("alloc-2", "app", resAlloc)
+ alloc = objects.NewAllocation("alloc-2-uuid", node2ID, ask)
+ assert.Assert(t, node2.AddAllocation(alloc), "unexpected failure adding
allocation to node")
+ err = rootQ.IncAllocatedResource(resAlloc, false)
+ assert.NilError(t, err, "unexpected error returned setting allocated
resource on queue")
+ // get nodes utilization
+ getNodeUtilisation(resp, req)
+ utilisation = &dao.NodesUtilDAOInfo{}
+ err = json.Unmarshal(resp.outputBytes, utilisation)
+ assert.NilError(t, err, "getNodeUtilisation should have returned an
object")
+ assert.Equal(t, utilisation.ResourceType, "second", "expected second as
type returned")
+ assert.Equal(t, len(utilisation.NodesUtil), 10, "empty usage:
unexpected bucket count returned")
+ assert.Assert(t, confirmNodeCount(utilisation.NodesUtil, 1),
"unexpected number of nodes returned should be 1")
+}
+
+func confirmNodeCount(info []*dao.NodeUtilDAOInfo, count int64) bool {
+ var total int64
+ for _, node := range info {
+ total += node.NumOfNodes
+ }
+ return total == count
}
func addAndConfirmApplicationExists(t *testing.T, partitionName string,
partition *scheduler.PartitionContext, appName string) *objects.Application {
diff --git a/pkg/webservice/routes.go b/pkg/webservice/routes.go
index e4df1c18..78957de0 100644
--- a/pkg/webservice/routes.go
+++ b/pkg/webservice/routes.go
@@ -253,4 +253,10 @@ var webRoutes = routes{
"/ws/v1/scheduler/healthcheck",
checkHealthStatus,
},
+ route{
+ "Scheduler",
+ "GET",
+ "/ws/v1/scheduler/node-utilization",
+ getNodeUtilisation,
+ },
}
diff --git a/pkg/webservice/webservice.go b/pkg/webservice/webservice.go
index ed5cef87..bf064dff 100644
--- a/pkg/webservice/webservice.go
+++ b/pkg/webservice/webservice.go
@@ -20,6 +20,7 @@ package webservice
import (
"context"
+ "errors"
"net/http"
"time"
@@ -60,6 +61,7 @@ func loggingHandler(inner http.Handler, name string)
http.HandlerFunc {
}
}
+// StartWebApp starts the web app on the default port.
// TODO we need the port to be configurable
func (m *WebService) StartWebApp() {
router := newRouter()
@@ -68,7 +70,7 @@ func (m *WebService) StartWebApp() {
log.Log(log.REST).Info("web-app started", zap.Int("port", 9080))
go func() {
httpError := m.httpServer.ListenAndServe()
- if httpError != nil && httpError != http.ErrServerClosed {
+ if httpError != nil && !errors.Is(httpError,
http.ErrServerClosed) {
log.Log(log.REST).Error("HTTP serving error",
zap.Error(httpError))
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]