This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git


The following commit(s) were added to refs/heads/master by this push:
     new 77e19f6a [YUNIKORN-2235] Add new RESTful API for retrieving 
application (#750)
77e19f6a is described below

commit 77e19f6aca27966c3a83f14d0388ddeeb5e3cbe8
Author: Xie Yifan <[email protected]>
AuthorDate: Fri Jan 5 17:43:27 2024 +0800

    [YUNIKORN-2235] Add new RESTful API for retrieving application (#750)
    
    Closes: #750
    
    Signed-off-by: Chia-Ping Tsai <[email protected]>
---
 pkg/scheduler/objects/application_state.go |   1 +
 pkg/scheduler/partition.go                 |   4 +
 pkg/scheduler/partition_test.go            |  17 ++++
 pkg/webservice/handlers.go                 |  94 ++++++++++++++++---
 pkg/webservice/handlers_test.go            | 140 +++++++++++++++++++++++++++++
 pkg/webservice/routes.go                   |  12 +++
 6 files changed, 258 insertions(+), 10 deletions(-)

diff --git a/pkg/scheduler/objects/application_state.go 
b/pkg/scheduler/objects/application_state.go
index 9ef98bf1..d07a72cc 100644
--- a/pkg/scheduler/objects/application_state.go
+++ b/pkg/scheduler/objects/application_state.go
@@ -60,6 +60,7 @@ func (ae applicationEvent) String() string {
 // ----------------------------------
 type applicationState int
 
+// Application states are used for filtering in the webservice handlers. 
Please check&update the logic as needed if the state machine is modified
 const (
        New applicationState = iota
        Accepted
diff --git a/pkg/scheduler/partition.go b/pkg/scheduler/partition.go
index 16cad537..018b8288 100644
--- a/pkg/scheduler/partition.go
+++ b/pkg/scheduler/partition.go
@@ -425,6 +425,10 @@ func (pc *PartitionContext) removeAppInternal(appID 
string) *objects.Application
        return app
 }
 
+func (pc *PartitionContext) GetApplication(appID string) *objects.Application {
+       return pc.getApplication(appID)
+}
+
 func (pc *PartitionContext) getApplication(appID string) *objects.Application {
        pc.RLock()
        defer pc.RUnlock()
diff --git a/pkg/scheduler/partition_test.go b/pkg/scheduler/partition_test.go
index 1565f5fb..3e59a3f2 100644
--- a/pkg/scheduler/partition_test.go
+++ b/pkg/scheduler/partition_test.go
@@ -1451,6 +1451,23 @@ func TestUpdateQueues(t *testing.T) {
        assertUpdateQueues(t, "both", map[string]string{})
 }
 
+func TestGetApplication(t *testing.T) {
+       partition, err := newBasePartition()
+       assert.NilError(t, err, "partition create failed")
+       app := newApplication(appID1, "default", defQueue)
+       err = partition.AddApplication(app)
+       assert.NilError(t, err, "no error expected while adding the 
application")
+       assert.Equal(t, partition.GetApplication(appID1), app, "partition 
failed to add app incorrect app returned")
+       app2 := newApplication(appID2, "default", "unknown")
+       err = partition.AddApplication(app2)
+       if err == nil {
+               t.Error("app-2 should not have been added to the partition")
+       }
+       if partition.GetApplication(appID2) != nil {
+               t.Fatal("partition added app incorrectly should have failed")
+       }
+}
+
 func TestGetQueue(t *testing.T) {
        // get the partition
        partition, err := newBasePartition()
diff --git a/pkg/webservice/handlers.go b/pkg/webservice/handlers.go
index c159cb95..46bbb760 100644
--- a/pkg/webservice/handlers.go
+++ b/pkg/webservice/handlers.go
@@ -60,6 +60,27 @@ const (
        NodeDoesNotExists        = "Node not found"
 )
 
+var allowedActiveStatusMsg string
+var allowedAppActiveStatuses map[string]bool
+
+func init() {
+       allowedAppActiveStatuses = make(map[string]bool)
+
+       allowedAppActiveStatuses["new"] = true
+       allowedAppActiveStatuses["accepted"] = true
+       allowedAppActiveStatuses["starting"] = true
+       allowedAppActiveStatuses["running"] = true
+       allowedAppActiveStatuses["completing"] = true
+       allowedAppActiveStatuses["failing"] = true
+       allowedAppActiveStatuses["resuming"] = true
+
+       var activeStatuses []string
+       for k := range allowedAppActiveStatuses {
+               activeStatuses = append(activeStatuses, k)
+       }
+       allowedActiveStatusMsg = fmt.Sprintf("Only following active statuses 
are allowed: %s", strings.Join(activeStatuses, ","))
+}
+
 func getStackInfo(w http.ResponseWriter, r *http.Request) {
        writeHeaders(w)
        var stack = func() []byte {
@@ -619,6 +640,54 @@ func getQueueApplications(w http.ResponseWriter, r 
*http.Request) {
        }
 }
 
+func getPartitionApplicationsByState(w http.ResponseWriter, r *http.Request) {
+       writeHeaders(w)
+       vars := httprouter.ParamsFromContext(r.Context())
+       if vars == nil {
+               buildJSONErrorResponse(w, MissingParamsName, 
http.StatusBadRequest)
+               return
+       }
+       partition := vars.ByName("partition")
+       appState := strings.ToLower(vars.ByName("state"))
+
+       partitionContext := 
schedulerContext.GetPartitionWithoutClusterID(partition)
+       if partitionContext == nil {
+               buildJSONErrorResponse(w, PartitionDoesNotExists, 
http.StatusNotFound)
+               return
+       }
+       var appList []*objects.Application
+       switch appState {
+       case "active":
+               if status := strings.ToLower(r.URL.Query().Get("status")); 
status != "" {
+                       if !allowedAppActiveStatuses[status] {
+                               buildJSONErrorResponse(w, 
allowedActiveStatusMsg, http.StatusBadRequest)
+                               return
+                       }
+                       for _, app := range partitionContext.GetApplications() {
+                               if strings.ToLower(app.CurrentState()) == 
status {
+                                       appList = append(appList, app)
+                               }
+                       }
+               } else {
+                       appList = partitionContext.GetApplications()
+               }
+       case "rejected":
+               appList = partitionContext.GetRejectedApplications()
+       case "completed":
+               appList = partitionContext.GetCompletedApplications()
+       default:
+               buildJSONErrorResponse(w, "Only following application states 
are allowed: active, rejected, completed", http.StatusBadRequest)
+               return
+       }
+       appsDao := make([]*dao.ApplicationDAOInfo, 0, len(appList))
+       for _, app := range appList {
+               appsDao = append(appsDao, getApplicationDAO(app))
+       }
+       if err := json.NewEncoder(w).Encode(appsDao); err != nil {
+               buildJSONErrorResponse(w, err.Error(), 
http.StatusInternalServerError)
+       }
+}
+
 func getApplication(w http.ResponseWriter, r *http.Request) {
        writeHeaders(w)
        vars := httprouter.ParamsFromContext(r.Context())
@@ -629,22 +698,27 @@ func getApplication(w http.ResponseWriter, r 
*http.Request) {
        partition := vars.ByName("partition")
        queueName := vars.ByName("queue")
        application := vars.ByName("application")
-       queueErr := validateQueue(queueName)
-       if queueErr != nil {
-               buildJSONErrorResponse(w, queueErr.Error(), 
http.StatusBadRequest)
-               return
-       }
        partitionContext := 
schedulerContext.GetPartitionWithoutClusterID(partition)
        if partitionContext == nil {
                buildJSONErrorResponse(w, PartitionDoesNotExists, 
http.StatusNotFound)
                return
        }
-       queue := partitionContext.GetQueue(queueName)
-       if queue == nil {
-               buildJSONErrorResponse(w, QueueDoesNotExists, 
http.StatusNotFound)
-               return
+       var app *objects.Application
+       if len(queueName) == 0 {
+               app = partitionContext.GetApplication(application)
+       } else {
+               queueErr := validateQueue(queueName)
+               if queueErr != nil {
+                       buildJSONErrorResponse(w, queueErr.Error(), 
http.StatusBadRequest)
+                       return
+               }
+               queue := partitionContext.GetQueue(queueName)
+               if queue == nil {
+                       buildJSONErrorResponse(w, QueueDoesNotExists, 
http.StatusNotFound)
+                       return
+               }
+               app = queue.GetApplication(application)
        }
-       app := queue.GetApplication(application)
        if app == nil {
                buildJSONErrorResponse(w, ApplicationDoesNotExists, 
http.StatusNotFound)
                return
diff --git a/pkg/webservice/handlers_test.go b/pkg/webservice/handlers_test.go
index 12c681b5..406261c5 100644
--- a/pkg/webservice/handlers_test.go
+++ b/pkg/webservice/handlers_test.go
@@ -1093,8 +1093,14 @@ func addAppWithUserGroup(t *testing.T, id string, part 
*scheduler.PartitionConte
        assert.Equal(t, 1+initSize, len(part.GetApplications()))
        if isCompleted {
                app.SetState(objects.Completing.String())
+               currentCount := len(part.GetCompletedApplications())
                err = app.HandleApplicationEvent(objects.CompleteApplication)
                assert.NilError(t, err, "The app should have completed")
+               err = common.WaitFor(10*time.Millisecond, time.Second, func() 
bool {
+                       newCount := len(part.GetCompletedApplications())
+                       return newCount == currentCount+1
+               })
+               assert.NilError(t, err, "the completed application should have 
been processed")
        }
        return app
 }
@@ -1198,6 +1204,90 @@ func TestGetQueueApplicationsHandler(t *testing.T) {
        assertParamsMissing(t, resp)
 }
 
+func checkLegalGetAppsRequest(t *testing.T, url string, params 
httprouter.Params, expected []*dao.ApplicationDAOInfo) {
+       req, err := http.NewRequest("GET", url, strings.NewReader(""))
+       req = req.WithContext(context.WithValue(req.Context(), 
httprouter.ParamsKey, params))
+       assert.NilError(t, err)
+       resp := &MockResponseWriter{}
+       var appsDao []*dao.ApplicationDAOInfo
+       getPartitionApplicationsByState(resp, req)
+       err = json.Unmarshal(resp.outputBytes, &appsDao)
+       assert.NilError(t, err, unmarshalError)
+       assert.Equal(t, len(appsDao), len(expected))
+}
+
+func checkIllegalGetAppsRequest(t *testing.T, url string, params 
httprouter.Params, assertFunc func(t *testing.T, resp *MockResponseWriter)) {
+       req, err := http.NewRequest("GET", url, strings.NewReader(""))
+       req = req.WithContext(context.WithValue(req.Context(), 
httprouter.ParamsKey, params))
+       assert.NilError(t, err)
+       resp := &MockResponseWriter{}
+       getPartitionApplicationsByState(resp, req)
+       assertFunc(t, resp)
+}
+
+func TestGetPartitionApplicationsByStateHandler(t *testing.T) {
+       defaultPartition := setup(t, configDefault, 1)
+       NewWebApp(schedulerContext, nil)
+
+       // add a new application
+       app1 := addApp(t, "app-1", defaultPartition, "root.default", false)
+       app1.SetState(objects.New.String())
+
+       // add a running application
+       app2 := addApp(t, "app-2", defaultPartition, "root.default", false)
+       app2.SetState(objects.Running.String())
+
+       // add a completed application
+       app3 := addApp(t, "app-3", defaultPartition, "root.default", true)
+
+       // add a rejected application
+       app4 := newApplication("app-4", defaultPartition.Name, "root.default", 
rmID, security.UserGroup{})
+       rejectedMessage := fmt.Sprintf("Failed to place application %s: 
application rejected: no placement rule matched", app3.ApplicationID)
+       defaultPartition.AddRejectedApplication(app3, rejectedMessage)
+
+       // test get active app
+       expectedActiveDao := []*dao.ApplicationDAOInfo{getApplicationDAO(app1), 
getApplicationDAO(app2)}
+       checkLegalGetAppsRequest(t, 
"/ws/v1/partition/default/applications/Active", httprouter.Params{
+               httprouter.Param{Key: "partition", Value: 
partitionNameWithoutClusterID},
+               httprouter.Param{Key: "state", Value: "Active"}}, 
expectedActiveDao)
+
+       // test get active app with running state
+       expectedRunningDao := []*dao.ApplicationDAOInfo{getApplicationDAO(app2)}
+       checkLegalGetAppsRequest(t, 
"/ws/v1/partition/default/applications/Active?status=Running", 
httprouter.Params{
+               httprouter.Param{Key: "partition", Value: 
partitionNameWithoutClusterID},
+               httprouter.Param{Key: "state", Value: "Active"}}, 
expectedRunningDao)
+
+       // test get completed app
+       expectedCompletedDao := 
[]*dao.ApplicationDAOInfo{getApplicationDAO(app3)}
+       checkLegalGetAppsRequest(t, 
"/ws/v1/partition/default/applications/Completed", httprouter.Params{
+               httprouter.Param{Key: "partition", Value: 
partitionNameWithoutClusterID},
+               httprouter.Param{Key: "state", Value: "Completed"}}, 
expectedCompletedDao)
+
+       // test get rejected app
+       expectedRejectedDao := 
[]*dao.ApplicationDAOInfo{getApplicationDAO(app4)}
+       checkLegalGetAppsRequest(t, 
"/ws/v1/partition/default/applications/Rejected", httprouter.Params{
+               httprouter.Param{Key: "partition", Value: 
partitionNameWithoutClusterID},
+               httprouter.Param{Key: "state", Value: "Rejected"}}, 
expectedRejectedDao)
+
+       // test nonexistent partition
+       checkIllegalGetAppsRequest(t, 
"/ws/v1/partition/default/applications/Active", httprouter.Params{
+               httprouter.Param{Key: "partition", Value: "notexists"},
+               httprouter.Param{Key: "state", Value: "Active"}}, 
assertPartitionExists)
+
+       // test disallow state
+       checkIllegalGetAppsRequest(t, 
"/ws/v1/partition/default/applications/Accepted", httprouter.Params{
+               httprouter.Param{Key: "partition", Value: 
partitionNameWithoutClusterID},
+               httprouter.Param{Key: "state", Value: "Accepted"}}, 
assertAppStateAllow)
+
+       // test disallow active state
+       checkIllegalGetAppsRequest(t, 
"/ws/v1/partition/default/applications/Active?status=invalid", 
httprouter.Params{
+               httprouter.Param{Key: "partition", Value: 
partitionNameWithoutClusterID},
+               httprouter.Param{Key: "state", Value: "Active"}}, 
assertActiveStateAllow)
+
+       // test missing params name
+       checkIllegalGetAppsRequest(t, 
"/ws/v1/partition/default/applications/Active", nil, assertParamsMissing)
+}
+
 func TestGetApplicationHandler(t *testing.T) {
        part := setup(t, configDefault, 1)
 
@@ -1275,6 +1365,38 @@ func TestGetApplicationHandler(t *testing.T) {
        getApplication(resp3, req3)
        assertApplicationExists(t, resp3)
 
+       // test without queue
+       var req4 *http.Request
+       req4, err = http.NewRequest("GET", 
"/ws/v1/partition/default/application/app-1", strings.NewReader(""))
+       req4 = req4.WithContext(context.WithValue(req.Context(), 
httprouter.ParamsKey, httprouter.Params{
+               httprouter.Param{Key: "partition", Value: 
partitionNameWithoutClusterID},
+               httprouter.Param{Key: "application", Value: "app-1"},
+       }))
+       assert.NilError(t, err, "Get Application Handler request failed")
+       resp4 := &MockResponseWriter{}
+       var appsDao4 *dao.ApplicationDAOInfo
+       getApplication(resp4, req4)
+       err = json.Unmarshal(resp4.outputBytes, &appsDao4)
+       assert.NilError(t, err, unmarshalError)
+
+       // test invalid queue name
+       var req5 *http.Request
+       req5, err = http.NewRequest("GET", 
"/ws/v1/partition/default/queue/root.default/application/app-1", 
strings.NewReader(""))
+       req5 = req5.WithContext(context.WithValue(req.Context(), 
httprouter.ParamsKey, httprouter.Params{
+               httprouter.Param{Key: "partition", Value: 
partitionNameWithoutClusterID},
+               httprouter.Param{Key: "queue", Value: "root.test.test123@"},
+               httprouter.Param{Key: "application", Value: "app-1"},
+       }))
+       assert.NilError(t, err, "Get Application Handler request failed")
+       resp5 := &MockResponseWriter{}
+       getApplication(resp5, req5)
+       var errInfo dao.YAPIError
+       err = json.Unmarshal(resp5.outputBytes, &errInfo)
+       assert.NilError(t, err, unmarshalError)
+       assert.Equal(t, http.StatusBadRequest, resp5.statusCode, 
statusCodeError)
+       assert.Equal(t, errInfo.Message, "problem in queue query parameter 
parsing as queue param root.test.test123@ contains invalid queue name test123@. 
Queue name must only have alphanumeric characters, - or _, and be no longer 
than 64 characters", jsonMessageError)
+       assert.Equal(t, errInfo.StatusCode, http.StatusBadRequest)
+
        // test missing params name
        req, err = http.NewRequest("GET", 
"/ws/v1/partition/default/queue/root.default/application/app-1", 
strings.NewReader(""))
        assert.NilError(t, err, "Get Application Handler request failed")
@@ -1378,6 +1500,24 @@ func assertNodeIDExists(t *testing.T, resp 
*MockResponseWriter) {
        assert.Equal(t, errInfo.StatusCode, http.StatusNotFound)
 }
 
+func assertAppStateAllow(t *testing.T, resp *MockResponseWriter) {
+       var errInfo dao.YAPIError
+       err := json.Unmarshal(resp.outputBytes, &errInfo)
+       assert.NilError(t, err, unmarshalError)
+       assert.Equal(t, http.StatusBadRequest, resp.statusCode, statusCodeError)
+       assert.Equal(t, errInfo.Message, "Only following application states are 
allowed: active, rejected, completed", jsonMessageError)
+       assert.Equal(t, errInfo.StatusCode, http.StatusBadRequest)
+}
+
+func assertActiveStateAllow(t *testing.T, resp *MockResponseWriter) {
+       var errInfo dao.YAPIError
+       err := json.Unmarshal(resp.outputBytes, &errInfo)
+       assert.NilError(t, err, unmarshalError)
+       assert.Equal(t, http.StatusBadRequest, resp.statusCode, statusCodeError)
+       assert.Equal(t, errInfo.Message, allowedActiveStatusMsg, 
jsonMessageError)
+       assert.Equal(t, errInfo.StatusCode, http.StatusBadRequest)
+}
+
 func TestValidateQueue(t *testing.T) {
        err := validateQueue("root.test.test123")
        assert.NilError(t, err, "Queue path is correct but stil throwing 
error.")
diff --git a/pkg/webservice/routes.go b/pkg/webservice/routes.go
index 78957de0..75e2230f 100644
--- a/pkg/webservice/routes.go
+++ b/pkg/webservice/routes.go
@@ -122,6 +122,18 @@ var webRoutes = routes{
                
"/ws/v1/partition/:partition/queue/:queue/application/:application",
                getApplication,
        },
+       route{
+               "Scheduler",
+               "GET",
+               "/ws/v1/partition/:partition/application/:application",
+               getApplication,
+       },
+       route{
+               "Scheduler",
+               "GET",
+               "/ws/v1/partition/:partition/applications/:state",
+               getPartitionApplicationsByState,
+       },
        route{
                "Scheduler",
                "GET",


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to