This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git


The following commit(s) were added to refs/heads/master by this push:
     new bd589608 [YUNIKORN-2461] Consider supporting 
"/ws/v1/partition/{partitionName}/queue/{queueName}/applications/:state" (#938)
bd589608 is described below

commit bd5896082f53840dab2e48f39ab4f397b3780477
Author: Tzu-Hua Lan <[email protected]>
AuthorDate: Sun Aug 11 02:35:34 2024 +0800

    [YUNIKORN-2461] Consider supporting 
"/ws/v1/partition/{partitionName}/queue/{queueName}/applications/:state" (#938)
    
    Closes: #938
    
    Signed-off-by: Chia-Ping Tsai <[email protected]>
---
 pkg/webservice/handlers.go      |  79 ++++++++++++++++++++++++++----
 pkg/webservice/handlers_test.go | 104 ++++++++++++++++++++++++++++++++++++++++
 pkg/webservice/routes.go        |   6 +++
 3 files changed, 179 insertions(+), 10 deletions(-)

diff --git a/pkg/webservice/handlers.go b/pkg/webservice/handlers.go
index 83003551..49309e70 100644
--- a/pkg/webservice/handlers.go
+++ b/pkg/webservice/handlers.go
@@ -62,6 +62,10 @@ const (
        GroupDoesNotExists       = "Group not found"
        ApplicationDoesNotExists = "Application not found"
        NodeDoesNotExists        = "Node not found"
+
+       AppStateActive    = "active"
+       AppStateRejected  = "rejected"
+       AppStateCompleted = "completed"
 )
 
 var allowedActiveStatusMsg string
@@ -72,12 +76,12 @@ var maxRESTResponseSize atomic.Uint64
 func init() {
        allowedAppActiveStatuses = make(map[string]bool)
 
-       allowedAppActiveStatuses["new"] = true
-       allowedAppActiveStatuses["accepted"] = true
-       allowedAppActiveStatuses["running"] = true
-       allowedAppActiveStatuses["completing"] = true
-       allowedAppActiveStatuses["failing"] = true
-       allowedAppActiveStatuses["resuming"] = true
+       allowedAppActiveStatuses[strings.ToLower(objects.New.String())] = true
+       allowedAppActiveStatuses[strings.ToLower(objects.Accepted.String())] = 
true
+       allowedAppActiveStatuses[strings.ToLower(objects.Running.String())] = 
true
+       allowedAppActiveStatuses[strings.ToLower(objects.Completing.String())] 
= true
+       allowedAppActiveStatuses[strings.ToLower(objects.Failing.String())] = 
true
+       allowedAppActiveStatuses[strings.ToLower(objects.Resuming.String())] = 
true
 
        var activeStatuses []string
        for k := range allowedAppActiveStatuses {
@@ -798,7 +802,7 @@ func getPartitionApplicationsByState(w http.ResponseWriter, 
r *http.Request) {
        }
        var appList []*objects.Application
        switch appState {
-       case "active":
+       case AppStateActive:
                if status := strings.ToLower(r.URL.Query().Get("status")); 
status != "" {
                        if !allowedAppActiveStatuses[status] {
                                buildJSONErrorResponse(w, 
allowedActiveStatusMsg, http.StatusBadRequest)
@@ -812,12 +816,12 @@ func getPartitionApplicationsByState(w 
http.ResponseWriter, r *http.Request) {
                } else {
                        appList = partitionContext.GetApplications()
                }
-       case "rejected":
+       case AppStateRejected:
                appList = partitionContext.GetRejectedApplications()
-       case "completed":
+       case AppStateCompleted:
                appList = partitionContext.GetCompletedApplications()
        default:
-               buildJSONErrorResponse(w, "Only following application states 
are allowed: active, rejected, completed", http.StatusBadRequest)
+               buildJSONErrorResponse(w, fmt.Sprintf("Only following 
application states are allowed: %s, %s, %s", AppStateActive, AppStateRejected, 
AppStateCompleted), http.StatusBadRequest)
                return
        }
        appsDao := make([]*dao.ApplicationDAOInfo, 0, len(appList))
@@ -897,6 +901,61 @@ func getPartitionRules(w http.ResponseWriter, r 
*http.Request) {
        }
 }
 
+func getQueueApplicationsByState(w http.ResponseWriter, r *http.Request) {
+       writeHeaders(w)
+       vars := httprouter.ParamsFromContext(r.Context())
+       if vars == nil {
+               buildJSONErrorResponse(w, MissingParamsName, 
http.StatusBadRequest)
+               return
+       }
+
+       partition := vars.ByName("partition")
+       queueName := vars.ByName("queue")
+       appState := strings.ToLower(vars.ByName("state"))
+       status := strings.ToLower(r.URL.Query().Get("status"))
+
+       partitionContext := 
schedulerContext.Load().GetPartitionWithoutClusterID(partition)
+       if partitionContext == nil {
+               buildJSONErrorResponse(w, PartitionDoesNotExists, 
http.StatusNotFound)
+               return
+       }
+       unescapedQueueName, err := url.QueryUnescape(queueName)
+       if err != nil {
+               buildJSONErrorResponse(w, err.Error(), http.StatusBadRequest)
+               return
+       }
+       queueErr := validateQueue(unescapedQueueName)
+       if queueErr != nil {
+               buildJSONErrorResponse(w, queueErr.Error(), 
http.StatusBadRequest)
+               return
+       }
+       queue := partitionContext.GetQueue(unescapedQueueName)
+       if queue == nil {
+               buildJSONErrorResponse(w, QueueDoesNotExists, 
http.StatusNotFound)
+               return
+       }
+       if appState != AppStateActive {
+               buildJSONErrorResponse(w, fmt.Sprintf("Only following 
application states are allowed: %s", AppStateActive), http.StatusBadRequest)
+               return
+       }
+       if status != "" && !allowedAppActiveStatuses[status] {
+               buildJSONErrorResponse(w, allowedActiveStatusMsg, 
http.StatusBadRequest)
+               return
+       }
+
+       appsDao := make([]*dao.ApplicationDAOInfo, 0)
+       for _, app := range queue.GetCopyOfApps() {
+               if status == "" || strings.ToLower(app.CurrentState()) == 
status {
+                       summary := 
app.GetApplicationSummary(partitionContext.RmID)
+                       appsDao = append(appsDao, getApplicationDAO(app, 
summary))
+               }
+       }
+
+       if err := json.NewEncoder(w).Encode(appsDao); err != nil {
+               buildJSONErrorResponse(w, err.Error(), 
http.StatusInternalServerError)
+       }
+}
+
 func getPartitionInfoDAO(lists map[string]*scheduler.PartitionContext) 
[]*dao.PartitionInfo {
        result := make([]*dao.PartitionInfo, 0, len(lists))
 
diff --git a/pkg/webservice/handlers_test.go b/pkg/webservice/handlers_test.go
index 8c9aaf06..00660a32 100644
--- a/pkg/webservice/handlers_test.go
+++ b/pkg/webservice/handlers_test.go
@@ -1608,6 +1608,110 @@ func TestGetPartitionApplicationsByStateHandler(t 
*testing.T) {
        checkIllegalGetAppsRequest(t, 
"/ws/v1/partition/default/applications/Active", nil, assertParamsMissing)
 }
 
+func checkGetQueueAppByState(t *testing.T, partition, queue, state, status 
string, expectedApp []*objects.Application) {
+       var url string
+       if status == "" {
+               url = 
fmt.Sprintf("/ws/v1/partition/%s/queue/%s/applications/%s", partition, queue, 
state)
+       } else {
+               url = 
fmt.Sprintf("/ws/v1/partition/%s/queue/%s/applications/%s?status=%s", 
partition, queue, state, status)
+       }
+       req, err := http.NewRequest("GET", url, strings.NewReader(""))
+       req = req.WithContext(context.WithValue(req.Context(), 
httprouter.ParamsKey, httprouter.Params{
+               httprouter.Param{Key: "partition", Value: partition},
+               httprouter.Param{Key: "queue", Value: queue},
+               httprouter.Param{Key: "state", Value: state},
+       }))
+
+       assert.NilError(t, err, "")
+       resp := &MockResponseWriter{}
+       getQueueApplicationsByState(resp, req)
+
+       var specificStatusApplicationsDAO []*dao.ApplicationDAOInfo
+       err = json.Unmarshal(resp.outputBytes, &specificStatusApplicationsDAO)
+       assert.NilError(t, err, unmarshalError)
+       assert.Equal(t, len(specificStatusApplicationsDAO), len(expectedApp))
+
+       daoAppIDs := make(map[string]bool)
+       expectedAppIDs := make(map[string]bool)
+       for _, app := range specificStatusApplicationsDAO {
+               daoAppIDs[app.ApplicationID] = true
+       }
+       for _, app := range expectedApp {
+               expectedAppIDs[app.ApplicationID] = true
+       }
+       assert.DeepEqual(t, daoAppIDs, expectedAppIDs)
+}
+
+func checkGetQueueAppByIllegalStateOrStatus(t *testing.T, partition, queue, 
state, status string) {
+       var url string
+       if status == "" {
+               url = 
fmt.Sprintf("/ws/v1/partition/%s/queue/%s/applications/%s", partition, queue, 
state)
+       } else {
+               url = 
fmt.Sprintf("/ws/v1/partition/%s/queue/%s/applications/%s?status=%s", 
partition, queue, state, status)
+       }
+       req, err := http.NewRequest("GET", url, strings.NewReader(""))
+       req = req.WithContext(context.WithValue(req.Context(), 
httprouter.ParamsKey, httprouter.Params{
+               httprouter.Param{Key: "partition", Value: partition},
+               httprouter.Param{Key: "queue", Value: queue},
+               httprouter.Param{Key: "state", Value: state},
+       }))
+
+       assert.NilError(t, err, "")
+       resp := &MockResponseWriter{}
+       getQueueApplicationsByState(resp, req)
+
+       var errInfo dao.YAPIError
+       err = json.Unmarshal(resp.outputBytes, &errInfo)
+       assert.NilError(t, err, unmarshalError)
+       assert.Equal(t, http.StatusBadRequest, resp.statusCode, statusCodeError)
+       var expectedErrMsg string
+       if strings.ToLower(state) != AppStateActive {
+               expectedErrMsg = fmt.Sprintf("Only following application states 
are allowed: %s", AppStateActive)
+       } else if status != "" {
+               expectedErrMsg = allowedActiveStatusMsg
+       }
+       assert.Equal(t, errInfo.Message, expectedErrMsg)
+       assert.Equal(t, errInfo.StatusCode, http.StatusBadRequest)
+}
+
+func TestGetQueueApplicationsByStateHandler(t *testing.T) {
+       defaultPartition := setup(t, configDefault, 1)
+       NewWebApp(schedulerContext.Load(), nil)
+
+       // Accept status
+       app1 := addApp(t, "app-1", defaultPartition, "root.default", false)
+       app1.SetState(objects.New.String())
+       app2 := addApp(t, "app-2", defaultPartition, "root.default", false)
+       app2.SetState(objects.Accepted.String())
+       app3 := addApp(t, "app-3", defaultPartition, "root.default", false)
+       app3.SetState(objects.Running.String())
+       app4 := addApp(t, "app-4", defaultPartition, "root.default", false)
+       app4.SetState(objects.Completing.String())
+       app5 := addApp(t, "app-5", defaultPartition, "root.default", false)
+       app5.SetState(objects.Failing.String())
+       app6 := addApp(t, "app-6", defaultPartition, "root.default", false)
+       app6.SetState(objects.Resuming.String())
+
+       newStateAppList := []*objects.Application{app1}
+       checkGetQueueAppByState(t, "default", "root.default", "Active", "New", 
newStateAppList)
+       acceptedStatusAppList := []*objects.Application{app2}
+       checkGetQueueAppByState(t, "default", "root.default", "Active", 
"Accepted", acceptedStatusAppList)
+       runningStatusAppList := []*objects.Application{app3}
+       checkGetQueueAppByState(t, "default", "root.default", "Active", 
"Running", runningStatusAppList)
+       completingStatusAppList := []*objects.Application{app4}
+       checkGetQueueAppByState(t, "default", "root.default", "Active", 
"Completing", completingStatusAppList)
+       failingStatusAppList := []*objects.Application{app5}
+       checkGetQueueAppByState(t, "default", "root.default", "Active", 
"Failing", failingStatusAppList)
+       resumingStatusAppList := []*objects.Application{app6}
+       checkGetQueueAppByState(t, "default", "root.default", "Active", 
"Resuming", resumingStatusAppList)
+       allStatusAppList := []*objects.Application{app1, app2, app3, app4, 
app5, app6}
+       checkGetQueueAppByState(t, "default", "root.default", "Active", "", 
allStatusAppList)
+
+       checkGetQueueAppByIllegalStateOrStatus(t, "default", "root.default", 
"Rejected", "")
+       checkGetQueueAppByIllegalStateOrStatus(t, "default", "root.default", 
"Completed", "")
+       checkGetQueueAppByIllegalStateOrStatus(t, "default", "root.default", 
"Active", "Invalid")
+}
+
 //nolint:funlen
 func TestGetApplicationHandler(t *testing.T) {
        part := setup(t, configDefault, 1)
diff --git a/pkg/webservice/routes.go b/pkg/webservice/routes.go
index 3eeef978..3a03ff5b 100644
--- a/pkg/webservice/routes.go
+++ b/pkg/webservice/routes.go
@@ -146,6 +146,12 @@ var webRoutes = routes{
                "/ws/v1/partition/:partition/applications/:state",
                getPartitionApplicationsByState,
        },
+       route{
+               "Scheduler",
+               "GET",
+               "/ws/v1/partition/:partition/queue/:queue/applications/:state",
+               getQueueApplicationsByState,
+       },
        route{
                "Scheduler",
                "GET",


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to