This is an automated email from the ASF dual-hosted git repository.
pbacsko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git
The following commit(s) were added to refs/heads/master by this push:
new 125373f2 [YUNIKORN-2652] Expand getApplication() endpoint handler to
return resource usage
125373f2 is described below
commit 125373f2a4c5dff5212cad35aeebdccbd457fb8c
Author: Rich Scott <[email protected]>
AuthorDate: Tue Jul 30 20:45:12 2024 +0200
[YUNIKORN-2652] Expand getApplication() endpoint handler to return resource
usage
Closes: #897
Signed-off-by: Peter Bacsko <[email protected]>
---
pkg/common/resources/tracked_resources.go | 39 ++++++++++++++++++
pkg/common/resources/tracked_resources_test.go | 53 ++++++++++++++++++++++++
pkg/webservice/dao/application_info.go | 46 ++++++++++++---------
pkg/webservice/handlers.go | 57 +++++++++++++++-----------
pkg/webservice/handlers_test.go | 35 ++++++++++++++--
5 files changed, 183 insertions(+), 47 deletions(-)
diff --git a/pkg/common/resources/tracked_resources.go
b/pkg/common/resources/tracked_resources.go
index 5198c337..09e8a022 100644
--- a/pkg/common/resources/tracked_resources.go
+++ b/pkg/common/resources/tracked_resources.go
@@ -103,3 +103,42 @@ func (tr *TrackedResource)
AggregateTrackedResource(instType string,
}
tr.TrackedResourceMap[instType] = aggregatedResourceTime
}
+
+func EqualsTracked(left, right *TrackedResource) bool {
+ if left == right {
+ return true
+ }
+
+ if left == nil || right == nil {
+ return false
+ }
+
+ for k, v := range left.TrackedResourceMap {
+ inner, ok := right.TrackedResourceMap[k]
+ if !ok {
+ return false
+ }
+
+ if !equalsMapContents(v, inner) {
+ return false
+ }
+ }
+
+ return true
+}
+
+func equalsMapContents(left, right map[string]int64) bool {
+ for k, v := range left {
+ if right[k] != v {
+ return false
+ }
+ }
+
+ for k, v := range right {
+ if left[k] != v {
+ return false
+ }
+ }
+
+ return true
+}
diff --git a/pkg/common/resources/tracked_resources_test.go
b/pkg/common/resources/tracked_resources_test.go
index 0e204379..5ca00832 100644
--- a/pkg/common/resources/tracked_resources_test.go
+++ b/pkg/common/resources/tracked_resources_test.go
@@ -23,6 +23,8 @@ import (
"reflect"
"testing"
"time"
+
+ "gotest.tools/v3/assert"
)
func CheckLenOfTrackedResource(res *TrackedResource, expected int) (bool,
string) {
@@ -210,3 +212,54 @@ func TestTrackedResourceAggregateTrackedResource(t
*testing.T) {
})
}
}
+
+func TestEqualsTracked(t *testing.T) {
+ type inputs struct {
+ base map[string]map[string]int64
+ compare map[string]map[string]int64
+ }
+ var tests = []struct {
+ caseName string
+ input inputs
+ expected bool
+ }{
+ {"simple cases (nil checks)", inputs{nil, nil}, true},
+ {"simple cases (nil checks)",
inputs{map[string]map[string]int64{}, nil}, false},
+ {"same first and second level keys and different resource
value",
+ inputs{map[string]map[string]int64{"first": {"val":
10}}, map[string]map[string]int64{"first": {"val": 0}}},
+ false,
+ },
+ {"different first-level key, same second-level key, same
resource value",
+ inputs{map[string]map[string]int64{"first": {"val":
10}}, map[string]map[string]int64{"second": {"val": 10}}},
+ false},
+ {"same first-level key, different second-level key, same
resource value",
+ inputs{map[string]map[string]int64{"first": {"val":
10}}, map[string]map[string]int64{"first": {"value": 10}}},
+ false},
+ {"same first-level key, second has larger sub-level map",
+ inputs{map[string]map[string]int64{"first": {"val":
10}}, map[string]map[string]int64{"first": {"val": 10, "sum": 7}}},
+ false},
+ {"same first-level key, first has larger sub-level map",
+ inputs{map[string]map[string]int64{"first": {"val": 10,
"sum": 7}}, map[string]map[string]int64{"first": {"val": 10}}},
+ false},
+ {"same keys and values",
+ inputs{map[string]map[string]int64{"x": {"val": 10,
"sum": 7}}, map[string]map[string]int64{"x": {"val": 10, "sum": 7}}},
+ true},
+ }
+ for _, tt := range tests {
+ t.Run(tt.caseName, func(t *testing.T) {
+ var base, compare *TrackedResource
+ if tt.input.base != nil {
+ base = NewTrackedResourceFromMap(tt.input.base)
+ }
+ if tt.input.compare != nil {
+ compare =
NewTrackedResourceFromMap(tt.input.compare)
+ }
+
+ result := EqualsTracked(base, compare)
+ assert.Assert(t, result == tt.expected, "Equal result
should be %v instead of %v, left %v, right %v", tt.expected, result, base,
compare)
+
+ result = EqualsTracked(compare, base)
+ assert.Assert(t, result == tt.expected, "Equal result
should be %v instead of %v, left %v, right %v", tt.expected, result, compare,
base)
+ })
+ }
+}
diff --git a/pkg/webservice/dao/application_info.go
b/pkg/webservice/dao/application_info.go
index b0760d8e..5c2e902a 100644
--- a/pkg/webservice/dao/application_info.go
+++ b/pkg/webservice/dao/application_info.go
@@ -18,30 +18,38 @@
package dao
+import (
+ "github.com/apache/yunikorn-core/pkg/common/resources"
+)
+
type ApplicationsDAOInfo struct {
Applications []ApplicationDAOInfo `json:"applications,omitempty"`
}
type ApplicationDAOInfo struct {
- ApplicationID string `json:"applicationID"` // no
omitempty, application id should not be empty
- UsedResource map[string]int64
`json:"usedResource,omitempty"`
- MaxUsedResource map[string]int64
`json:"maxUsedResource,omitempty"`
- PendingResource map[string]int64
`json:"pendingResource,omitempty"`
- Partition string `json:"partition"` // no
omitempty, partition should not be empty
- QueueName string `json:"queueName"` // no
omitempty, queue name should not be empty
- SubmissionTime int64
`json:"submissionTime,omitempty"`
- FinishedTime *int64
`json:"finishedTime,omitempty"`
- Requests []*AllocationAskDAOInfo `json:"requests,omitempty"`
- Allocations []*AllocationDAOInfo
`json:"allocations,omitempty"`
- State string
`json:"applicationState,omitempty"`
- User string `json:"user,omitempty"`
- Groups []string `json:"groups,omitempty"`
- RejectedMessage string
`json:"rejectedMessage,omitempty"`
- StateLog []*StateDAOInfo `json:"stateLog,omitempty"`
- PlaceholderData []*PlaceholderDAOInfo
`json:"placeholderData,omitempty"`
- HasReserved bool
`json:"hasReserved,omitempty"`
- Reservations []string
`json:"reservations,omitempty"`
- MaxRequestPriority int32
`json:"maxRequestPriority,omitempty"`
+ ApplicationID string `json:"applicationID"`
// no omitempty, application id should not be empty
+ UsedResource map[string]int64
`json:"usedResource,omitempty"`
+ MaxUsedResource map[string]int64
`json:"maxUsedResource,omitempty"`
+ PendingResource map[string]int64
`json:"pendingResource,omitempty"`
+ Partition string `json:"partition"` // no
omitempty, partition should not be empty
+ QueueName string `json:"queueName"` // no
omitempty, queue name should not be empty
+ SubmissionTime int64
`json:"submissionTime,omitempty"`
+ FinishedTime *int64
`json:"finishedTime,omitempty"`
+ Requests []*AllocationAskDAOInfo
`json:"requests,omitempty"`
+ Allocations []*AllocationDAOInfo
`json:"allocations,omitempty"`
+ State string
`json:"applicationState,omitempty"`
+ User string `json:"user,omitempty"`
+ Groups []string `json:"groups,omitempty"`
+ RejectedMessage string
`json:"rejectedMessage,omitempty"`
+ StateLog []*StateDAOInfo
`json:"stateLog,omitempty"`
+ PlaceholderData []*PlaceholderDAOInfo
`json:"placeholderData,omitempty"`
+ HasReserved bool
`json:"hasReserved,omitempty"`
+ Reservations []string
`json:"reservations,omitempty"`
+ MaxRequestPriority int32
`json:"maxRequestPriority,omitempty"`
+ StartTime int64
`json:"startTime,omitempty"`
+ ResourceUsage *resources.TrackedResource
`json:"resourceUsage,omitempty"`
+ PreemptedResource *resources.TrackedResource
`json:"preemptedResource,omitempty"`
+ PlaceholderResource *resources.TrackedResource
`json:"placeholderResource,omitempty"`
}
type StateDAOInfo struct {
diff --git a/pkg/webservice/handlers.go b/pkg/webservice/handlers.go
index a91baf73..2b6145db 100644
--- a/pkg/webservice/handlers.go
+++ b/pkg/webservice/handlers.go
@@ -294,31 +294,35 @@ func getStatesDAO(entries []*objects.StateLogEntry)
[]*dao.StateDAOInfo {
return statesDAO
}
-func getApplicationDAO(app *objects.Application) *dao.ApplicationDAOInfo {
+func getApplicationDAO(app *objects.Application, summary
*objects.ApplicationSummary) *dao.ApplicationDAOInfo {
if app == nil {
return &dao.ApplicationDAOInfo{}
}
return &dao.ApplicationDAOInfo{
- ApplicationID: app.ApplicationID,
- UsedResource: app.GetAllocatedResource().DAOMap(),
- MaxUsedResource: app.GetMaxAllocatedResource().DAOMap(),
- PendingResource: app.GetPendingResource().DAOMap(),
- Partition:
common.GetPartitionNameWithoutClusterID(app.Partition),
- QueueName: app.GetQueuePath(),
- SubmissionTime: app.SubmissionTime.UnixNano(),
- FinishedTime:
common.ZeroTimeInUnixNano(app.FinishedTime()),
- Requests: getAllocationAsksDAO(app.GetAllRequests()),
- Allocations: getAllocationsDAO(app.GetAllAllocations()),
- State: app.CurrentState(),
- User: app.GetUser().User,
- Groups: app.GetUser().Groups,
- RejectedMessage: app.GetRejectedMessage(),
- PlaceholderData:
getPlaceholdersDAO(app.GetAllPlaceholderData()),
- StateLog: getStatesDAO(app.GetStateLog()),
- HasReserved: app.HasReserved(),
- Reservations: app.GetReservations(),
- MaxRequestPriority: app.GetAskMaxPriority(),
+ ApplicationID: app.ApplicationID,
+ UsedResource: app.GetAllocatedResource().DAOMap(),
+ MaxUsedResource: app.GetMaxAllocatedResource().DAOMap(),
+ PendingResource: app.GetPendingResource().DAOMap(),
+ Partition:
common.GetPartitionNameWithoutClusterID(app.Partition),
+ QueueName: app.GetQueuePath(),
+ SubmissionTime: app.SubmissionTime.UnixNano(),
+ FinishedTime:
common.ZeroTimeInUnixNano(app.FinishedTime()),
+ Requests: getAllocationAsksDAO(app.GetAllRequests()),
+ Allocations: getAllocationsDAO(app.GetAllAllocations()),
+ State: app.CurrentState(),
+ User: app.GetUser().User,
+ Groups: app.GetUser().Groups,
+ RejectedMessage: app.GetRejectedMessage(),
+ PlaceholderData:
getPlaceholdersDAO(app.GetAllPlaceholderData()),
+ StateLog: getStatesDAO(app.GetStateLog()),
+ HasReserved: app.HasReserved(),
+ Reservations: app.GetReservations(),
+ MaxRequestPriority: app.GetAskMaxPriority(),
+ StartTime: app.StartTime().UnixMilli(),
+ ResourceUsage: summary.ResourceUsage,
+ PreemptedResource: summary.PreemptedResource,
+ PlaceholderResource: summary.PlaceholderResource,
}
}
@@ -769,7 +773,8 @@ func getQueueApplications(w http.ResponseWriter, r
*http.Request) {
appsDao := make([]*dao.ApplicationDAOInfo, 0)
for _, app := range queue.GetCopyOfApps() {
- appsDao = append(appsDao, getApplicationDAO(app))
+ summary := app.GetApplicationSummary(partitionContext.RmID)
+ appsDao = append(appsDao, getApplicationDAO(app, summary))
}
if err := json.NewEncoder(w).Encode(appsDao); err != nil {
@@ -818,7 +823,8 @@ func getPartitionApplicationsByState(w http.ResponseWriter,
r *http.Request) {
}
appsDao := make([]*dao.ApplicationDAOInfo, 0, len(appList))
for _, app := range appList {
- appsDao = append(appsDao, getApplicationDAO(app))
+ summary := app.GetApplicationSummary(partitionContext.RmID)
+ appsDao = append(appsDao, getApplicationDAO(app, summary))
}
if err := json.NewEncoder(w).Encode(appsDao); err != nil {
buildJSONErrorResponse(w, err.Error(),
http.StatusInternalServerError)
@@ -865,7 +871,9 @@ func getApplication(w http.ResponseWriter, r *http.Request)
{
buildJSONErrorResponse(w, ApplicationDoesNotExists,
http.StatusNotFound)
return
}
- appDao := getApplicationDAO(app)
+
+ summary := app.GetApplicationSummary(partitionContext.RmID)
+ appDao := getApplicationDAO(app, summary)
if err := json.NewEncoder(w).Encode(appDao); err != nil {
buildJSONErrorResponse(w, err.Error(),
http.StatusInternalServerError)
}
@@ -989,7 +997,8 @@ func getApplicationsDAO(lists
map[string]*scheduler.PartitionContext) []*dao.App
appList = append(appList,
partition.GetRejectedApplications()...)
for _, app := range appList {
- result = append(result, getApplicationDAO(app))
+ summary := app.GetApplicationSummary(partition.RmID)
+ result = append(result, getApplicationDAO(app, summary))
}
}
diff --git a/pkg/webservice/handlers_test.go b/pkg/webservice/handlers_test.go
index 31bf9cfb..9cca0dc6 100644
--- a/pkg/webservice/handlers_test.go
+++ b/pkg/webservice/handlers_test.go
@@ -1544,25 +1544,29 @@ func TestGetPartitionApplicationsByStateHandler(t
*testing.T) {
defaultPartition.AddRejectedApplication(app3, rejectedMessage)
// test get active app
- expectedActiveDao := []*dao.ApplicationDAOInfo{getApplicationDAO(app1),
getApplicationDAO(app2)}
+ sum1 := app1.GetApplicationSummary(defaultPartition.RmID)
+ sum2 := app2.GetApplicationSummary(defaultPartition.RmID)
+ expectedActiveDao := []*dao.ApplicationDAOInfo{getApplicationDAO(app1,
sum1), getApplicationDAO(app2, sum2)}
checkLegalGetAppsRequest(t,
"/ws/v1/partition/default/applications/Active", httprouter.Params{
httprouter.Param{Key: "partition", Value:
partitionNameWithoutClusterID},
httprouter.Param{Key: "state", Value: "Active"}},
expectedActiveDao)
// test get active app with running state
- expectedRunningDao := []*dao.ApplicationDAOInfo{getApplicationDAO(app2)}
+ expectedRunningDao := []*dao.ApplicationDAOInfo{getApplicationDAO(app2,
sum2)}
checkLegalGetAppsRequest(t,
"/ws/v1/partition/default/applications/Active?status=Running",
httprouter.Params{
httprouter.Param{Key: "partition", Value:
partitionNameWithoutClusterID},
httprouter.Param{Key: "state", Value: "Active"}},
expectedRunningDao)
// test get completed app
- expectedCompletedDao :=
[]*dao.ApplicationDAOInfo{getApplicationDAO(app3)}
+ sum3 := app3.GetApplicationSummary(defaultPartition.RmID)
+ expectedCompletedDao :=
[]*dao.ApplicationDAOInfo{getApplicationDAO(app3, sum3)}
checkLegalGetAppsRequest(t,
"/ws/v1/partition/default/applications/Completed", httprouter.Params{
httprouter.Param{Key: "partition", Value:
partitionNameWithoutClusterID},
httprouter.Param{Key: "state", Value: "Completed"}},
expectedCompletedDao)
// test get rejected app
- expectedRejectedDao :=
[]*dao.ApplicationDAOInfo{getApplicationDAO(app4)}
+ sum4 := app4.GetApplicationSummary(defaultPartition.RmID)
+ expectedRejectedDao :=
[]*dao.ApplicationDAOInfo{getApplicationDAO(app4, sum4)}
checkLegalGetAppsRequest(t,
"/ws/v1/partition/default/applications/Rejected", httprouter.Params{
httprouter.Param{Key: "partition", Value:
partitionNameWithoutClusterID},
httprouter.Param{Key: "state", Value: "Rejected"}},
expectedRejectedDao)
@@ -1586,6 +1590,7 @@ func TestGetPartitionApplicationsByStateHandler(t
*testing.T) {
checkIllegalGetAppsRequest(t,
"/ws/v1/partition/default/applications/Active", nil, assertParamsMissing)
}
+//nolint:funlen
func TestGetApplicationHandler(t *testing.T) {
part := setup(t, configDefault, 1)
@@ -1690,6 +1695,28 @@ func TestGetApplicationHandler(t *testing.T) {
assert.Equal(t, http.StatusBadRequest, resp.statusCode, statusCodeError)
assert.Equal(t, errInfo.Message, "invalid URL escape \"%Zt\"",
jsonMessageError)
assert.Equal(t, errInfo.StatusCode, http.StatusBadRequest)
+
+ // test additional application details
+ var req6 *http.Request
+ req6, err = http.NewRequest("GET",
"/ws/v1/partition/default/queue/root.default/application/app-1",
strings.NewReader(""))
+ assert.NilError(t, err, "HTTP request create failed")
+ req6 = req6.WithContext(context.WithValue(req.Context(),
httprouter.ParamsKey, httprouter.Params{
+ httprouter.Param{Key: "partition", Value:
partitionNameWithoutClusterID},
+ httprouter.Param{Key: "queue", Value: "root.default"},
+ httprouter.Param{Key: "application", Value: "app-1"},
+ }))
+ assert.NilError(t, err, "Get Application Handler request failed")
+ resp6 := &MockResponseWriter{}
+ var appDao *dao.ApplicationDAOInfo
+ getApplication(resp6, req6)
+ appSummary := app.GetApplicationSummary(partitionNameWithoutClusterID)
+ err = json.Unmarshal(resp6.outputBytes, &appDao)
+ assert.NilError(t, err, unmarshalError)
+ assert.Equal(t, "app-1", appDao.ApplicationID)
+ assert.Equal(t, app.StartTime().UnixMilli(), appDao.StartTime)
+ assert.Assert(t, resources.EqualsTracked(appSummary.ResourceUsage,
appDao.ResourceUsage))
+ assert.Assert(t, resources.EqualsTracked(appSummary.PreemptedResource,
appDao.PreemptedResource))
+ assert.Assert(t,
resources.EqualsTracked(appSummary.PlaceholderResource,
appDao.PlaceholderResource))
}
func assertParamsMissing(t *testing.T, resp *MockResponseWriter) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]