This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git
The following commit(s) were added to refs/heads/master by this push:
new 77e19f6a [YUNIKORN-2235] Add new RESTful API for retrieving
application (#750)
77e19f6a is described below
commit 77e19f6aca27966c3a83f14d0388ddeeb5e3cbe8
Author: Xie Yifan <[email protected]>
AuthorDate: Fri Jan 5 17:43:27 2024 +0800
[YUNIKORN-2235] Add new RESTful API for retrieving application (#750)
Closes: #750
Signed-off-by: Chia-Ping Tsai <[email protected]>
---
pkg/scheduler/objects/application_state.go | 1 +
pkg/scheduler/partition.go | 4 +
pkg/scheduler/partition_test.go | 17 ++++
pkg/webservice/handlers.go | 94 ++++++++++++++++---
pkg/webservice/handlers_test.go | 140 +++++++++++++++++++++++++++++
pkg/webservice/routes.go | 12 +++
6 files changed, 258 insertions(+), 10 deletions(-)
diff --git a/pkg/scheduler/objects/application_state.go
b/pkg/scheduler/objects/application_state.go
index 9ef98bf1..d07a72cc 100644
--- a/pkg/scheduler/objects/application_state.go
+++ b/pkg/scheduler/objects/application_state.go
@@ -60,6 +60,7 @@ func (ae applicationEvent) String() string {
// ----------------------------------
type applicationState int
+// Application states are used for filtering in the webservice handlers.
Please check&update the logic as needed if the state machine is modified
const (
New applicationState = iota
Accepted
diff --git a/pkg/scheduler/partition.go b/pkg/scheduler/partition.go
index 16cad537..018b8288 100644
--- a/pkg/scheduler/partition.go
+++ b/pkg/scheduler/partition.go
@@ -425,6 +425,10 @@ func (pc *PartitionContext) removeAppInternal(appID
string) *objects.Application
return app
}
+func (pc *PartitionContext) GetApplication(appID string) *objects.Application {
+ return pc.getApplication(appID)
+}
+
func (pc *PartitionContext) getApplication(appID string) *objects.Application {
pc.RLock()
defer pc.RUnlock()
diff --git a/pkg/scheduler/partition_test.go b/pkg/scheduler/partition_test.go
index 1565f5fb..3e59a3f2 100644
--- a/pkg/scheduler/partition_test.go
+++ b/pkg/scheduler/partition_test.go
@@ -1451,6 +1451,23 @@ func TestUpdateQueues(t *testing.T) {
assertUpdateQueues(t, "both", map[string]string{})
}
+func TestGetApplication(t *testing.T) {
+ partition, err := newBasePartition()
+ assert.NilError(t, err, "partition create failed")
+ app := newApplication(appID1, "default", defQueue)
+ err = partition.AddApplication(app)
+ assert.NilError(t, err, "no error expected while adding the
application")
+ assert.Equal(t, partition.GetApplication(appID1), app, "partition
failed to add app incorrect app returned")
+ app2 := newApplication(appID2, "default", "unknown")
+ err = partition.AddApplication(app2)
+ if err == nil {
+ t.Error("app-2 should not have been added to the partition")
+ }
+ if partition.GetApplication(appID2) != nil {
+ t.Fatal("partition added app incorrectly should have failed")
+ }
+}
+
func TestGetQueue(t *testing.T) {
// get the partition
partition, err := newBasePartition()
diff --git a/pkg/webservice/handlers.go b/pkg/webservice/handlers.go
index c159cb95..46bbb760 100644
--- a/pkg/webservice/handlers.go
+++ b/pkg/webservice/handlers.go
@@ -60,6 +60,27 @@ const (
NodeDoesNotExists = "Node not found"
)
+var allowedActiveStatusMsg string
+var allowedAppActiveStatuses map[string]bool
+
+func init() {
+ allowedAppActiveStatuses = make(map[string]bool)
+
+ allowedAppActiveStatuses["new"] = true
+ allowedAppActiveStatuses["accepted"] = true
+ allowedAppActiveStatuses["starting"] = true
+ allowedAppActiveStatuses["running"] = true
+ allowedAppActiveStatuses["completing"] = true
+ allowedAppActiveStatuses["failing"] = true
+ allowedAppActiveStatuses["resuming"] = true
+
+ var activeStatuses []string
+ for k := range allowedAppActiveStatuses {
+ activeStatuses = append(activeStatuses, k)
+ }
+ allowedActiveStatusMsg = fmt.Sprintf("Only following active statuses
are allowed: %s", strings.Join(activeStatuses, ","))
+}
+
func getStackInfo(w http.ResponseWriter, r *http.Request) {
writeHeaders(w)
var stack = func() []byte {
@@ -619,6 +640,54 @@ func getQueueApplications(w http.ResponseWriter, r
*http.Request) {
}
}
+func getPartitionApplicationsByState(w http.ResponseWriter, r *http.Request) {
+ writeHeaders(w)
+ vars := httprouter.ParamsFromContext(r.Context())
+ if vars == nil {
+ buildJSONErrorResponse(w, MissingParamsName,
http.StatusBadRequest)
+ return
+ }
+ partition := vars.ByName("partition")
+ appState := strings.ToLower(vars.ByName("state"))
+
+ partitionContext :=
schedulerContext.GetPartitionWithoutClusterID(partition)
+ if partitionContext == nil {
+ buildJSONErrorResponse(w, PartitionDoesNotExists,
http.StatusNotFound)
+ return
+ }
+ var appList []*objects.Application
+ switch appState {
+ case "active":
+ if status := strings.ToLower(r.URL.Query().Get("status"));
status != "" {
+ if !allowedAppActiveStatuses[status] {
+ buildJSONErrorResponse(w,
allowedActiveStatusMsg, http.StatusBadRequest)
+ return
+ }
+ for _, app := range partitionContext.GetApplications() {
+ if strings.ToLower(app.CurrentState()) ==
status {
+ appList = append(appList, app)
+ }
+ }
+ } else {
+ appList = partitionContext.GetApplications()
+ }
+ case "rejected":
+ appList = partitionContext.GetRejectedApplications()
+ case "completed":
+ appList = partitionContext.GetCompletedApplications()
+ default:
+ buildJSONErrorResponse(w, "Only following application states
are allowed: active, rejected, completed", http.StatusBadRequest)
+ return
+ }
+ appsDao := make([]*dao.ApplicationDAOInfo, 0, len(appList))
+ for _, app := range appList {
+ appsDao = append(appsDao, getApplicationDAO(app))
+ }
+ if err := json.NewEncoder(w).Encode(appsDao); err != nil {
+ buildJSONErrorResponse(w, err.Error(),
http.StatusInternalServerError)
+ }
+}
+
func getApplication(w http.ResponseWriter, r *http.Request) {
writeHeaders(w)
vars := httprouter.ParamsFromContext(r.Context())
@@ -629,22 +698,27 @@ func getApplication(w http.ResponseWriter, r
*http.Request) {
partition := vars.ByName("partition")
queueName := vars.ByName("queue")
application := vars.ByName("application")
- queueErr := validateQueue(queueName)
- if queueErr != nil {
- buildJSONErrorResponse(w, queueErr.Error(),
http.StatusBadRequest)
- return
- }
partitionContext :=
schedulerContext.GetPartitionWithoutClusterID(partition)
if partitionContext == nil {
buildJSONErrorResponse(w, PartitionDoesNotExists,
http.StatusNotFound)
return
}
- queue := partitionContext.GetQueue(queueName)
- if queue == nil {
- buildJSONErrorResponse(w, QueueDoesNotExists,
http.StatusNotFound)
- return
+ var app *objects.Application
+ if len(queueName) == 0 {
+ app = partitionContext.GetApplication(application)
+ } else {
+ queueErr := validateQueue(queueName)
+ if queueErr != nil {
+ buildJSONErrorResponse(w, queueErr.Error(),
http.StatusBadRequest)
+ return
+ }
+ queue := partitionContext.GetQueue(queueName)
+ if queue == nil {
+ buildJSONErrorResponse(w, QueueDoesNotExists,
http.StatusNotFound)
+ return
+ }
+ app = queue.GetApplication(application)
}
- app := queue.GetApplication(application)
if app == nil {
buildJSONErrorResponse(w, ApplicationDoesNotExists,
http.StatusNotFound)
return
diff --git a/pkg/webservice/handlers_test.go b/pkg/webservice/handlers_test.go
index 12c681b5..406261c5 100644
--- a/pkg/webservice/handlers_test.go
+++ b/pkg/webservice/handlers_test.go
@@ -1093,8 +1093,14 @@ func addAppWithUserGroup(t *testing.T, id string, part
*scheduler.PartitionConte
assert.Equal(t, 1+initSize, len(part.GetApplications()))
if isCompleted {
app.SetState(objects.Completing.String())
+ currentCount := len(part.GetCompletedApplications())
err = app.HandleApplicationEvent(objects.CompleteApplication)
assert.NilError(t, err, "The app should have completed")
+ err = common.WaitFor(10*time.Millisecond, time.Second, func()
bool {
+ newCount := len(part.GetCompletedApplications())
+ return newCount == currentCount+1
+ })
+ assert.NilError(t, err, "the completed application should have
been processed")
}
return app
}
@@ -1198,6 +1204,90 @@ func TestGetQueueApplicationsHandler(t *testing.T) {
assertParamsMissing(t, resp)
}
+func checkLegalGetAppsRequest(t *testing.T, url string, params
httprouter.Params, expected []*dao.ApplicationDAOInfo) {
+ req, err := http.NewRequest("GET", url, strings.NewReader(""))
+ req = req.WithContext(context.WithValue(req.Context(),
httprouter.ParamsKey, params))
+ assert.NilError(t, err)
+ resp := &MockResponseWriter{}
+ var appsDao []*dao.ApplicationDAOInfo
+ getPartitionApplicationsByState(resp, req)
+ err = json.Unmarshal(resp.outputBytes, &appsDao)
+ assert.NilError(t, err, unmarshalError)
+ assert.Equal(t, len(appsDao), len(expected))
+}
+
+func checkIllegalGetAppsRequest(t *testing.T, url string, params
httprouter.Params, assertFunc func(t *testing.T, resp *MockResponseWriter)) {
+ req, err := http.NewRequest("GET", url, strings.NewReader(""))
+ req = req.WithContext(context.WithValue(req.Context(),
httprouter.ParamsKey, params))
+ assert.NilError(t, err)
+ resp := &MockResponseWriter{}
+ getPartitionApplicationsByState(resp, req)
+ assertFunc(t, resp)
+}
+
+func TestGetPartitionApplicationsByStateHandler(t *testing.T) {
+ defaultPartition := setup(t, configDefault, 1)
+ NewWebApp(schedulerContext, nil)
+
+ // add a new application
+ app1 := addApp(t, "app-1", defaultPartition, "root.default", false)
+ app1.SetState(objects.New.String())
+
+ // add a running application
+ app2 := addApp(t, "app-2", defaultPartition, "root.default", false)
+ app2.SetState(objects.Running.String())
+
+ // add a completed application
+ app3 := addApp(t, "app-3", defaultPartition, "root.default", true)
+
+ // add a rejected application
+ app4 := newApplication("app-4", defaultPartition.Name, "root.default",
rmID, security.UserGroup{})
+ rejectedMessage := fmt.Sprintf("Failed to place application %s:
application rejected: no placement rule matched", app3.ApplicationID)
+ defaultPartition.AddRejectedApplication(app3, rejectedMessage)
+
+ // test get active app
+ expectedActiveDao := []*dao.ApplicationDAOInfo{getApplicationDAO(app1),
getApplicationDAO(app2)}
+ checkLegalGetAppsRequest(t,
"/ws/v1/partition/default/applications/Active", httprouter.Params{
+ httprouter.Param{Key: "partition", Value:
partitionNameWithoutClusterID},
+ httprouter.Param{Key: "state", Value: "Active"}},
expectedActiveDao)
+
+ // test get active app with running state
+ expectedRunningDao := []*dao.ApplicationDAOInfo{getApplicationDAO(app2)}
+ checkLegalGetAppsRequest(t,
"/ws/v1/partition/default/applications/Active?status=Running",
httprouter.Params{
+ httprouter.Param{Key: "partition", Value:
partitionNameWithoutClusterID},
+ httprouter.Param{Key: "state", Value: "Active"}},
expectedRunningDao)
+
+ // test get completed app
+ expectedCompletedDao :=
[]*dao.ApplicationDAOInfo{getApplicationDAO(app3)}
+ checkLegalGetAppsRequest(t,
"/ws/v1/partition/default/applications/Completed", httprouter.Params{
+ httprouter.Param{Key: "partition", Value:
partitionNameWithoutClusterID},
+ httprouter.Param{Key: "state", Value: "Completed"}},
expectedCompletedDao)
+
+ // test get rejected app
+ expectedRejectedDao :=
[]*dao.ApplicationDAOInfo{getApplicationDAO(app4)}
+ checkLegalGetAppsRequest(t,
"/ws/v1/partition/default/applications/Rejected", httprouter.Params{
+ httprouter.Param{Key: "partition", Value:
partitionNameWithoutClusterID},
+ httprouter.Param{Key: "state", Value: "Rejected"}},
expectedRejectedDao)
+
+ // test nonexistent partition
+ checkIllegalGetAppsRequest(t,
"/ws/v1/partition/default/applications/Active", httprouter.Params{
+ httprouter.Param{Key: "partition", Value: "notexists"},
+ httprouter.Param{Key: "state", Value: "Active"}},
assertPartitionExists)
+
+ // test disallow state
+ checkIllegalGetAppsRequest(t,
"/ws/v1/partition/default/applications/Accepted", httprouter.Params{
+ httprouter.Param{Key: "partition", Value:
partitionNameWithoutClusterID},
+ httprouter.Param{Key: "state", Value: "Accepted"}},
assertAppStateAllow)
+
+ // test disallow active state
+ checkIllegalGetAppsRequest(t,
"/ws/v1/partition/default/applications/Active?status=invalid",
httprouter.Params{
+ httprouter.Param{Key: "partition", Value:
partitionNameWithoutClusterID},
+ httprouter.Param{Key: "state", Value: "Active"}},
assertActiveStateAllow)
+
+ // test missing params name
+ checkIllegalGetAppsRequest(t,
"/ws/v1/partition/default/applications/Active", nil, assertParamsMissing)
+}
+
func TestGetApplicationHandler(t *testing.T) {
part := setup(t, configDefault, 1)
@@ -1275,6 +1365,38 @@ func TestGetApplicationHandler(t *testing.T) {
getApplication(resp3, req3)
assertApplicationExists(t, resp3)
+ // test without queue
+ var req4 *http.Request
+ req4, err = http.NewRequest("GET",
"/ws/v1/partition/default/application/app-1", strings.NewReader(""))
+ req4 = req4.WithContext(context.WithValue(req.Context(),
httprouter.ParamsKey, httprouter.Params{
+ httprouter.Param{Key: "partition", Value:
partitionNameWithoutClusterID},
+ httprouter.Param{Key: "application", Value: "app-1"},
+ }))
+ assert.NilError(t, err, "Get Application Handler request failed")
+ resp4 := &MockResponseWriter{}
+ var appsDao4 *dao.ApplicationDAOInfo
+ getApplication(resp4, req4)
+ err = json.Unmarshal(resp4.outputBytes, &appsDao4)
+ assert.NilError(t, err, unmarshalError)
+
+ // test invalid queue name
+ var req5 *http.Request
+ req5, err = http.NewRequest("GET",
"/ws/v1/partition/default/queue/root.default/application/app-1",
strings.NewReader(""))
+ req5 = req5.WithContext(context.WithValue(req.Context(),
httprouter.ParamsKey, httprouter.Params{
+ httprouter.Param{Key: "partition", Value:
partitionNameWithoutClusterID},
+ httprouter.Param{Key: "queue", Value: "root.test.test123@"},
+ httprouter.Param{Key: "application", Value: "app-1"},
+ }))
+ assert.NilError(t, err, "Get Application Handler request failed")
+ resp5 := &MockResponseWriter{}
+ getApplication(resp5, req5)
+ var errInfo dao.YAPIError
+ err = json.Unmarshal(resp5.outputBytes, &errInfo)
+ assert.NilError(t, err, unmarshalError)
+ assert.Equal(t, http.StatusBadRequest, resp5.statusCode,
statusCodeError)
+ assert.Equal(t, errInfo.Message, "problem in queue query parameter
parsing as queue param root.test.test123@ contains invalid queue name test123@.
Queue name must only have alphanumeric characters, - or _, and be no longer
than 64 characters", jsonMessageError)
+ assert.Equal(t, errInfo.StatusCode, http.StatusBadRequest)
+
// test missing params name
req, err = http.NewRequest("GET",
"/ws/v1/partition/default/queue/root.default/application/app-1",
strings.NewReader(""))
assert.NilError(t, err, "Get Application Handler request failed")
@@ -1378,6 +1500,24 @@ func assertNodeIDExists(t *testing.T, resp
*MockResponseWriter) {
assert.Equal(t, errInfo.StatusCode, http.StatusNotFound)
}
+func assertAppStateAllow(t *testing.T, resp *MockResponseWriter) {
+ var errInfo dao.YAPIError
+ err := json.Unmarshal(resp.outputBytes, &errInfo)
+ assert.NilError(t, err, unmarshalError)
+ assert.Equal(t, http.StatusBadRequest, resp.statusCode, statusCodeError)
+ assert.Equal(t, errInfo.Message, "Only following application states are
allowed: active, rejected, completed", jsonMessageError)
+ assert.Equal(t, errInfo.StatusCode, http.StatusBadRequest)
+}
+
+func assertActiveStateAllow(t *testing.T, resp *MockResponseWriter) {
+ var errInfo dao.YAPIError
+ err := json.Unmarshal(resp.outputBytes, &errInfo)
+ assert.NilError(t, err, unmarshalError)
+ assert.Equal(t, http.StatusBadRequest, resp.statusCode, statusCodeError)
+ assert.Equal(t, errInfo.Message, allowedActiveStatusMsg,
jsonMessageError)
+ assert.Equal(t, errInfo.StatusCode, http.StatusBadRequest)
+}
+
func TestValidateQueue(t *testing.T) {
err := validateQueue("root.test.test123")
assert.NilError(t, err, "Queue path is correct but stil throwing
error.")
diff --git a/pkg/webservice/routes.go b/pkg/webservice/routes.go
index 78957de0..75e2230f 100644
--- a/pkg/webservice/routes.go
+++ b/pkg/webservice/routes.go
@@ -122,6 +122,18 @@ var webRoutes = routes{
"/ws/v1/partition/:partition/queue/:queue/application/:application",
getApplication,
},
+ route{
+ "Scheduler",
+ "GET",
+ "/ws/v1/partition/:partition/application/:application",
+ getApplication,
+ },
+ route{
+ "Scheduler",
+ "GET",
+ "/ws/v1/partition/:partition/applications/:state",
+ getPartitionApplicationsByState,
+ },
route{
"Scheduler",
"GET",
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]