This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git
The following commit(s) were added to refs/heads/master by this push:
new bd589608 [YUNIKORN-2461] Consider supporting
"/ws/v1/partition/{partitionName}/queue/{queueName}/applications/:state" (#938)
bd589608 is described below
commit bd5896082f53840dab2e48f39ab4f397b3780477
Author: Tzu-Hua Lan <[email protected]>
AuthorDate: Sun Aug 11 02:35:34 2024 +0800
[YUNIKORN-2461] Consider supporting
"/ws/v1/partition/{partitionName}/queue/{queueName}/applications/:state" (#938)
Closes: #938
Signed-off-by: Chia-Ping Tsai <[email protected]>
---
pkg/webservice/handlers.go | 79 ++++++++++++++++++++++++++----
pkg/webservice/handlers_test.go | 104 ++++++++++++++++++++++++++++++++++++++++
pkg/webservice/routes.go | 6 +++
3 files changed, 179 insertions(+), 10 deletions(-)
diff --git a/pkg/webservice/handlers.go b/pkg/webservice/handlers.go
index 83003551..49309e70 100644
--- a/pkg/webservice/handlers.go
+++ b/pkg/webservice/handlers.go
@@ -62,6 +62,10 @@ const (
GroupDoesNotExists = "Group not found"
ApplicationDoesNotExists = "Application not found"
NodeDoesNotExists = "Node not found"
+
+ AppStateActive = "active"
+ AppStateRejected = "rejected"
+ AppStateCompleted = "completed"
)
var allowedActiveStatusMsg string
@@ -72,12 +76,12 @@ var maxRESTResponseSize atomic.Uint64
func init() {
allowedAppActiveStatuses = make(map[string]bool)
- allowedAppActiveStatuses["new"] = true
- allowedAppActiveStatuses["accepted"] = true
- allowedAppActiveStatuses["running"] = true
- allowedAppActiveStatuses["completing"] = true
- allowedAppActiveStatuses["failing"] = true
- allowedAppActiveStatuses["resuming"] = true
+ allowedAppActiveStatuses[strings.ToLower(objects.New.String())] = true
+ allowedAppActiveStatuses[strings.ToLower(objects.Accepted.String())] =
true
+ allowedAppActiveStatuses[strings.ToLower(objects.Running.String())] =
true
+ allowedAppActiveStatuses[strings.ToLower(objects.Completing.String())]
= true
+ allowedAppActiveStatuses[strings.ToLower(objects.Failing.String())] =
true
+ allowedAppActiveStatuses[strings.ToLower(objects.Resuming.String())] =
true
var activeStatuses []string
for k := range allowedAppActiveStatuses {
@@ -798,7 +802,7 @@ func getPartitionApplicationsByState(w http.ResponseWriter,
r *http.Request) {
}
var appList []*objects.Application
switch appState {
- case "active":
+ case AppStateActive:
if status := strings.ToLower(r.URL.Query().Get("status"));
status != "" {
if !allowedAppActiveStatuses[status] {
buildJSONErrorResponse(w,
allowedActiveStatusMsg, http.StatusBadRequest)
@@ -812,12 +816,12 @@ func getPartitionApplicationsByState(w
http.ResponseWriter, r *http.Request) {
} else {
appList = partitionContext.GetApplications()
}
- case "rejected":
+ case AppStateRejected:
appList = partitionContext.GetRejectedApplications()
- case "completed":
+ case AppStateCompleted:
appList = partitionContext.GetCompletedApplications()
default:
- buildJSONErrorResponse(w, "Only following application states
are allowed: active, rejected, completed", http.StatusBadRequest)
+ buildJSONErrorResponse(w, fmt.Sprintf("Only following
application states are allowed: %s, %s, %s", AppStateActive, AppStateRejected,
AppStateCompleted), http.StatusBadRequest)
return
}
appsDao := make([]*dao.ApplicationDAOInfo, 0, len(appList))
@@ -897,6 +901,61 @@ func getPartitionRules(w http.ResponseWriter, r
*http.Request) {
}
}
+func getQueueApplicationsByState(w http.ResponseWriter, r *http.Request) {
+ writeHeaders(w)
+ vars := httprouter.ParamsFromContext(r.Context())
+ if vars == nil {
+ buildJSONErrorResponse(w, MissingParamsName,
http.StatusBadRequest)
+ return
+ }
+
+ partition := vars.ByName("partition")
+ queueName := vars.ByName("queue")
+ appState := strings.ToLower(vars.ByName("state"))
+ status := strings.ToLower(r.URL.Query().Get("status"))
+
+ partitionContext :=
schedulerContext.Load().GetPartitionWithoutClusterID(partition)
+ if partitionContext == nil {
+ buildJSONErrorResponse(w, PartitionDoesNotExists,
http.StatusNotFound)
+ return
+ }
+ unescapedQueueName, err := url.QueryUnescape(queueName)
+ if err != nil {
+ buildJSONErrorResponse(w, err.Error(), http.StatusBadRequest)
+ return
+ }
+ queueErr := validateQueue(unescapedQueueName)
+ if queueErr != nil {
+ buildJSONErrorResponse(w, queueErr.Error(),
http.StatusBadRequest)
+ return
+ }
+ queue := partitionContext.GetQueue(unescapedQueueName)
+ if queue == nil {
+ buildJSONErrorResponse(w, QueueDoesNotExists,
http.StatusNotFound)
+ return
+ }
+ if appState != AppStateActive {
+ buildJSONErrorResponse(w, fmt.Sprintf("Only following
application states are allowed: %s", AppStateActive), http.StatusBadRequest)
+ return
+ }
+ if status != "" && !allowedAppActiveStatuses[status] {
+ buildJSONErrorResponse(w, allowedActiveStatusMsg,
http.StatusBadRequest)
+ return
+ }
+
+ appsDao := make([]*dao.ApplicationDAOInfo, 0)
+ for _, app := range queue.GetCopyOfApps() {
+ if status == "" || strings.ToLower(app.CurrentState()) ==
status {
+ summary :=
app.GetApplicationSummary(partitionContext.RmID)
+ appsDao = append(appsDao, getApplicationDAO(app,
summary))
+ }
+ }
+
+ if err := json.NewEncoder(w).Encode(appsDao); err != nil {
+ buildJSONErrorResponse(w, err.Error(),
http.StatusInternalServerError)
+ }
+}
+
func getPartitionInfoDAO(lists map[string]*scheduler.PartitionContext)
[]*dao.PartitionInfo {
result := make([]*dao.PartitionInfo, 0, len(lists))
diff --git a/pkg/webservice/handlers_test.go b/pkg/webservice/handlers_test.go
index 8c9aaf06..00660a32 100644
--- a/pkg/webservice/handlers_test.go
+++ b/pkg/webservice/handlers_test.go
@@ -1608,6 +1608,110 @@ func TestGetPartitionApplicationsByStateHandler(t
*testing.T) {
checkIllegalGetAppsRequest(t,
"/ws/v1/partition/default/applications/Active", nil, assertParamsMissing)
}
+func checkGetQueueAppByState(t *testing.T, partition, queue, state, status
string, expectedApp []*objects.Application) {
+ var url string
+ if status == "" {
+ url =
fmt.Sprintf("/ws/v1/partition/%s/queue/%s/applications/%s", partition, queue,
state)
+ } else {
+ url =
fmt.Sprintf("/ws/v1/partition/%s/queue/%s/applications/%s?status=%s",
partition, queue, state, status)
+ }
+ req, err := http.NewRequest("GET", url, strings.NewReader(""))
+ req = req.WithContext(context.WithValue(req.Context(),
httprouter.ParamsKey, httprouter.Params{
+ httprouter.Param{Key: "partition", Value: partition},
+ httprouter.Param{Key: "queue", Value: queue},
+ httprouter.Param{Key: "state", Value: state},
+ }))
+
+ assert.NilError(t, err, "")
+ resp := &MockResponseWriter{}
+ getQueueApplicationsByState(resp, req)
+
+ var specificStatusApplicationsDAO []*dao.ApplicationDAOInfo
+ err = json.Unmarshal(resp.outputBytes, &specificStatusApplicationsDAO)
+ assert.NilError(t, err, unmarshalError)
+ assert.Equal(t, len(specificStatusApplicationsDAO), len(expectedApp))
+
+ daoAppIDs := make(map[string]bool)
+ expectedAppIDs := make(map[string]bool)
+ for _, app := range specificStatusApplicationsDAO {
+ daoAppIDs[app.ApplicationID] = true
+ }
+ for _, app := range expectedApp {
+ expectedAppIDs[app.ApplicationID] = true
+ }
+ assert.DeepEqual(t, daoAppIDs, expectedAppIDs)
+}
+
+func checkGetQueueAppByIllegalStateOrStatus(t *testing.T, partition, queue,
state, status string) {
+ var url string
+ if status == "" {
+ url =
fmt.Sprintf("/ws/v1/partition/%s/queue/%s/applications/%s", partition, queue,
state)
+ } else {
+ url =
fmt.Sprintf("/ws/v1/partition/%s/queue/%s/applications/%s?status=%s",
partition, queue, state, status)
+ }
+ req, err := http.NewRequest("GET", url, strings.NewReader(""))
+ req = req.WithContext(context.WithValue(req.Context(),
httprouter.ParamsKey, httprouter.Params{
+ httprouter.Param{Key: "partition", Value: partition},
+ httprouter.Param{Key: "queue", Value: queue},
+ httprouter.Param{Key: "state", Value: state},
+ }))
+
+ assert.NilError(t, err, "")
+ resp := &MockResponseWriter{}
+ getQueueApplicationsByState(resp, req)
+
+ var errInfo dao.YAPIError
+ err = json.Unmarshal(resp.outputBytes, &errInfo)
+ assert.NilError(t, err, unmarshalError)
+ assert.Equal(t, http.StatusBadRequest, resp.statusCode, statusCodeError)
+ var expectedErrMsg string
+ if strings.ToLower(state) != AppStateActive {
+ expectedErrMsg = fmt.Sprintf("Only following application states
are allowed: %s", AppStateActive)
+ } else if status != "" {
+ expectedErrMsg = allowedActiveStatusMsg
+ }
+ assert.Equal(t, errInfo.Message, expectedErrMsg)
+ assert.Equal(t, errInfo.StatusCode, http.StatusBadRequest)
+}
+
+func TestGetQueueApplicationsByStateHandler(t *testing.T) {
+ defaultPartition := setup(t, configDefault, 1)
+ NewWebApp(schedulerContext.Load(), nil)
+
+ // Accept status
+ app1 := addApp(t, "app-1", defaultPartition, "root.default", false)
+ app1.SetState(objects.New.String())
+ app2 := addApp(t, "app-2", defaultPartition, "root.default", false)
+ app2.SetState(objects.Accepted.String())
+ app3 := addApp(t, "app-3", defaultPartition, "root.default", false)
+ app3.SetState(objects.Running.String())
+ app4 := addApp(t, "app-4", defaultPartition, "root.default", false)
+ app4.SetState(objects.Completing.String())
+ app5 := addApp(t, "app-5", defaultPartition, "root.default", false)
+ app5.SetState(objects.Failing.String())
+ app6 := addApp(t, "app-6", defaultPartition, "root.default", false)
+ app6.SetState(objects.Resuming.String())
+
+ newStateAppList := []*objects.Application{app1}
+ checkGetQueueAppByState(t, "default", "root.default", "Active", "New",
newStateAppList)
+ acceptedStatusAppList := []*objects.Application{app2}
+ checkGetQueueAppByState(t, "default", "root.default", "Active",
"Accepted", acceptedStatusAppList)
+ runningStatusAppList := []*objects.Application{app3}
+ checkGetQueueAppByState(t, "default", "root.default", "Active",
"Running", runningStatusAppList)
+ completingStatusAppList := []*objects.Application{app4}
+ checkGetQueueAppByState(t, "default", "root.default", "Active",
"Completing", completingStatusAppList)
+ failingStatusAppList := []*objects.Application{app5}
+ checkGetQueueAppByState(t, "default", "root.default", "Active",
"Failing", failingStatusAppList)
+ resumingStatusAppList := []*objects.Application{app6}
+ checkGetQueueAppByState(t, "default", "root.default", "Active",
"Resuming", resumingStatusAppList)
+ allStatusAppList := []*objects.Application{app1, app2, app3, app4,
app5, app6}
+ checkGetQueueAppByState(t, "default", "root.default", "Active", "",
allStatusAppList)
+
+ checkGetQueueAppByIllegalStateOrStatus(t, "default", "root.default",
"Rejected", "")
+ checkGetQueueAppByIllegalStateOrStatus(t, "default", "root.default",
"Completed", "")
+ checkGetQueueAppByIllegalStateOrStatus(t, "default", "root.default",
"Active", "Invalid")
+}
+
//nolint:funlen
func TestGetApplicationHandler(t *testing.T) {
part := setup(t, configDefault, 1)
diff --git a/pkg/webservice/routes.go b/pkg/webservice/routes.go
index 3eeef978..3a03ff5b 100644
--- a/pkg/webservice/routes.go
+++ b/pkg/webservice/routes.go
@@ -146,6 +146,12 @@ var webRoutes = routes{
"/ws/v1/partition/:partition/applications/:state",
getPartitionApplicationsByState,
},
+ route{
+ "Scheduler",
+ "GET",
+ "/ws/v1/partition/:partition/queue/:queue/applications/:state",
+ getQueueApplicationsByState,
+ },
route{
"Scheduler",
"GET",
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]