This is an automated email from the ASF dual-hosted git repository.
wilfreds pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git
The following commit(s) were added to refs/heads/master by this push:
new e9805c95 [YUNIKORN-2042] REST API for specific queue (#687)
e9805c95 is described below
commit e9805c956bb806437dcf103c0350af3756cc8ba1
Author: steinsgateted <[email protected]>
AuthorDate: Mon Feb 26 20:09:07 2024 +1100
[YUNIKORN-2042] REST API for specific queue (#687)
Expose a REST API for specific queue:
/ws/v1/partition/%s/queue/%s
/ws/v1/partition/%s/queue/%s?aubtree
The call takes one query parameter "subtree" if provided the whole tree
of queues rooted at the level requested in the call will be returned. If
"subtree" is not set only the queue requested will be returned
Closes: #687
Signed-off-by: Wilfred Spiegelenburg <[email protected]>
---
pkg/scheduler/objects/queue.go | 16 ++++---
pkg/scheduler/objects/queue_test.go | 20 ++++-----
pkg/scheduler/partition.go | 2 +-
pkg/webservice/dao/queue_info.go | 1 +
pkg/webservice/handlers.go | 30 +++++++++++++
pkg/webservice/handlers_test.go | 90 +++++++++++++++++++++++++++++++------
pkg/webservice/routes.go | 6 +++
7 files changed, 136 insertions(+), 29 deletions(-)
diff --git a/pkg/scheduler/objects/queue.go b/pkg/scheduler/objects/queue.go
index ebf70e29..43c66e8b 100644
--- a/pkg/scheduler/objects/queue.go
+++ b/pkg/scheduler/objects/queue.go
@@ -624,17 +624,23 @@ func (sq *Queue) CheckAdminAccess(user
security.UserGroup) bool {
}
// GetPartitionQueueDAOInfo returns the queue hierarchy as an object for a
REST call.
-func (sq *Queue) GetPartitionQueueDAOInfo() dao.PartitionQueueDAOInfo {
+// Exclude is true, which means that returns the specified queue object, but
does not return the children of the specified queue.
+func (sq *Queue) GetPartitionQueueDAOInfo(exclude bool)
dao.PartitionQueueDAOInfo {
queueInfo := dao.PartitionQueueDAOInfo{}
- childes := sq.GetCopyOfChildren()
- queueInfo.Children = make([]dao.PartitionQueueDAOInfo, 0, len(childes))
- for _, child := range childes {
- queueInfo.Children = append(queueInfo.Children,
child.GetPartitionQueueDAOInfo())
+ children := sq.GetCopyOfChildren()
+ if !exclude {
+ queueInfo.Children = make([]dao.PartitionQueueDAOInfo, 0,
len(children))
+ for _, child := range children {
+ queueInfo.Children = append(queueInfo.Children,
child.GetPartitionQueueDAOInfo(false))
+ }
}
// we have held the read lock so following method should not take lock
again.
sq.RLock()
defer sq.RUnlock()
+ for _, child := range children {
+ queueInfo.ChildrenNames = append(queueInfo.ChildrenNames,
child.QueuePath)
+ }
queueInfo.QueueName = sq.QueuePath
queueInfo.Status = sq.stateMachine.Current()
queueInfo.PendingResource = sq.pending.DAOMap()
diff --git a/pkg/scheduler/objects/queue_test.go
b/pkg/scheduler/objects/queue_test.go
index 4c9e3303..7c46484e 100644
--- a/pkg/scheduler/objects/queue_test.go
+++ b/pkg/scheduler/objects/queue_test.go
@@ -1685,7 +1685,7 @@ func TestGetPartitionQueueDAOInfo(t *testing.T) {
// test properties
root.properties = getProperties()
- assert.DeepEqual(t, root.properties,
root.GetPartitionQueueDAOInfo().Properties)
+ assert.DeepEqual(t, root.properties,
root.GetPartitionQueueDAOInfo(false).Properties)
// test template
root.template, err = template.FromConf(&configs.ChildTemplate{
@@ -1697,23 +1697,23 @@ func TestGetPartitionQueueDAOInfo(t *testing.T) {
},
})
assert.NilError(t, err)
- assert.Equal(t, root.template.GetMaxApplications(),
root.GetPartitionQueueDAOInfo().TemplateInfo.MaxApplications)
- assert.DeepEqual(t, root.template.GetProperties(),
root.GetPartitionQueueDAOInfo().TemplateInfo.Properties)
- assert.DeepEqual(t, root.template.GetMaxResource().DAOMap(),
root.template.GetMaxResource().DAOMap())
- assert.DeepEqual(t, root.template.GetGuaranteedResource().DAOMap(),
root.template.GetGuaranteedResource().DAOMap())
+ assert.Equal(t, root.template.GetMaxApplications(),
root.GetPartitionQueueDAOInfo(false).TemplateInfo.MaxApplications)
+ assert.DeepEqual(t, root.template.GetProperties(),
root.GetPartitionQueueDAOInfo(false).TemplateInfo.Properties)
+ assert.DeepEqual(t, root.template.GetMaxResource().DAOMap(),
root.GetPartitionQueueDAOInfo(false).TemplateInfo.MaxResource)
+ assert.DeepEqual(t, root.template.GetGuaranteedResource().DAOMap(),
root.GetPartitionQueueDAOInfo(false).TemplateInfo.GuaranteedResource)
// test resources
root.maxResource = getResource(t)
root.guaranteedResource = getResource(t)
- assert.DeepEqual(t, root.GetMaxResource().DAOMap(),
root.GetPartitionQueueDAOInfo().MaxResource)
- assert.DeepEqual(t, root.GetGuaranteedResource().DAOMap(),
root.GetPartitionQueueDAOInfo().GuaranteedResource)
- assert.DeepEqual(t, root.getHeadRoom().DAOMap(),
root.GetPartitionQueueDAOInfo().HeadRoom)
+ assert.DeepEqual(t, root.GetMaxResource().DAOMap(),
root.GetPartitionQueueDAOInfo(false).MaxResource)
+ assert.DeepEqual(t, root.GetGuaranteedResource().DAOMap(),
root.GetPartitionQueueDAOInfo(false).GuaranteedResource)
+ assert.DeepEqual(t, root.getHeadRoom().DAOMap(),
root.GetPartitionQueueDAOInfo(false).HeadRoom)
// test allocatingAcceptedApps
root.allocatingAcceptedApps = getAllocatingAcceptedApps()
assert.Equal(t, len(root.allocatingAcceptedApps), 2,
"allocatingAcceptedApps size")
- assert.Equal(t,
len(root.GetPartitionQueueDAOInfo().AllocatingAcceptedApps), 1,
"AllocatingAcceptedApps size")
- assert.Equal(t,
root.GetPartitionQueueDAOInfo().AllocatingAcceptedApps[0], appID1)
+ assert.Equal(t,
len(root.GetPartitionQueueDAOInfo(false).AllocatingAcceptedApps), 1,
"AllocatingAcceptedApps size")
+ assert.Equal(t,
root.GetPartitionQueueDAOInfo(false).AllocatingAcceptedApps[0], appID1)
}
func getAllocatingAcceptedApps() map[string]bool {
diff --git a/pkg/scheduler/partition.go b/pkg/scheduler/partition.go
index ec84c9f0..c318c6b8 100644
--- a/pkg/scheduler/partition.go
+++ b/pkg/scheduler/partition.go
@@ -477,7 +477,7 @@ func (pc *PartitionContext) getQueueInternal(name string)
*objects.Queue {
// Get the queue info for the whole queue structure to pass to the webservice
func (pc *PartitionContext) GetPartitionQueues() dao.PartitionQueueDAOInfo {
- partitionQueueDAOInfo := pc.root.GetPartitionQueueDAOInfo()
+ partitionQueueDAOInfo := pc.root.GetPartitionQueueDAOInfo(false)
partitionQueueDAOInfo.Partition =
common.GetPartitionNameWithoutClusterID(pc.Name)
return partitionQueueDAOInfo
}
diff --git a/pkg/webservice/dao/queue_info.go b/pkg/webservice/dao/queue_info.go
index 2c2ee250..8f137dca 100644
--- a/pkg/webservice/dao/queue_info.go
+++ b/pkg/webservice/dao/queue_info.go
@@ -40,6 +40,7 @@ type PartitionQueueDAOInfo struct {
Parent string `json:"parent,omitempty"`
TemplateInfo *TemplateInfo
`json:"template,omitempty"`
Children []PartitionQueueDAOInfo
`json:"children,omitempty"`
+ ChildrenNames []string
`json:"childrenNames,omitempty"`
AbsUsedCapacity map[string]int64
`json:"absUsedCapacity,omitempty"`
MaxRunningApps uint64
`json:"maxRunningApps,omitempty"`
RunningApps uint64
`json:"runningApps,omitempty"`
diff --git a/pkg/webservice/handlers.go b/pkg/webservice/handlers.go
index 2d633460..a10bac53 100644
--- a/pkg/webservice/handlers.go
+++ b/pkg/webservice/handlers.go
@@ -642,6 +642,36 @@ func getPartitionQueues(w http.ResponseWriter, r
*http.Request) {
}
}
+func getPartitionQueue(w http.ResponseWriter, r *http.Request) {
+ writeHeaders(w)
+ vars := httprouter.ParamsFromContext(r.Context())
+ if vars == nil {
+ buildJSONErrorResponse(w, MissingParamsName,
http.StatusBadRequest)
+ return
+ }
+ partition := vars.ByName("partition")
+ partitionContext :=
schedulerContext.GetPartitionWithoutClusterID(partition)
+ if partitionContext == nil {
+ buildJSONErrorResponse(w, PartitionDoesNotExists,
http.StatusNotFound)
+ return
+ }
+ queueName := vars.ByName("queue")
+ queueErr := validateQueue(queueName)
+ if queueErr != nil {
+ buildJSONErrorResponse(w, queueErr.Error(),
http.StatusBadRequest)
+ return
+ }
+ queue := partitionContext.GetQueue(queueName)
+ if queue == nil {
+ buildJSONErrorResponse(w, QueueDoesNotExists,
http.StatusNotFound)
+ return
+ }
+ queueDao := queue.GetPartitionQueueDAOInfo(r.URL.Query().Has("subtree"))
+ if err := json.NewEncoder(w).Encode(queueDao); err != nil {
+ buildJSONErrorResponse(w, err.Error(),
http.StatusInternalServerError)
+ }
+}
+
func getPartitionNodes(w http.ResponseWriter, r *http.Request) {
writeHeaders(w)
vars := httprouter.ParamsFromContext(r.Context())
diff --git a/pkg/webservice/handlers_test.go b/pkg/webservice/handlers_test.go
index 9cf7633c..0442619a 100644
--- a/pkg/webservice/handlers_test.go
+++ b/pkg/webservice/handlers_test.go
@@ -1147,20 +1147,80 @@ func TestGetPartitionQueuesHandler(t *testing.T) {
assert.Equal(t, child.Properties[configs.ApplicationSortPolicy],
policies.FifoSortPolicy.String())
assert.Assert(t, child.TemplateInfo == nil)
- // Partition not exists
+ // test partition not exists
req, err = http.NewRequest("GET", "/ws/v1/partition/default/queues",
strings.NewReader(""))
req = req.WithContext(context.WithValue(req.Context(),
httprouter.ParamsKey, httprouter.Params{httprouter.Param{Key: "partition",
Value: "notexists"}}))
- assert.NilError(t, err, "Get Queues for PartitionQueues Handler request
failed")
+ assert.NilError(t, err)
resp = &MockResponseWriter{}
getPartitionQueues(resp, req)
assertPartitionNotExists(t, resp)
// test params name missing
req, err = http.NewRequest("GET", "/ws/v1/partition/default/queues",
strings.NewReader(""))
- assert.NilError(t, err, "Get Queues for PartitionQueues Handler request
failed")
+ assert.NilError(t, err)
resp = &MockResponseWriter{}
getPartitionQueues(resp, req)
assertParamsMissing(t, resp)
+
+ // test specific queue
+ var partitionQueueDao1 dao.PartitionQueueDAOInfo
+ req, err = http.NewRequest("GET",
"/ws/v1/partition/default/queue/root.a?subtree", strings.NewReader(""))
+ req = req.WithContext(context.WithValue(req.Context(),
httprouter.ParamsKey, httprouter.Params{httprouter.Param{Key: "partition",
Value: "default"}, httprouter.Param{Key: "queue", Value: "root.a"}}))
+ assert.NilError(t, err)
+ resp = &MockResponseWriter{}
+ getPartitionQueue(resp, req)
+ err = json.Unmarshal(resp.outputBytes, &partitionQueueDao1)
+ assert.NilError(t, err, unmarshalError)
+ assert.Equal(t, partitionQueueDao1.QueueName, "root.a")
+ assert.Equal(t, len(partitionQueueDao1.Children), 0)
+ assert.Equal(t, len(partitionQueueDao1.ChildrenNames), 1)
+ assert.Equal(t, partitionQueueDao1.ChildrenNames[0], "root.a.a1")
+
+ // test hierarchy queue
+ var partitionQueueDao2 dao.PartitionQueueDAOInfo
+ req, err = http.NewRequest("GET",
"/ws/v1/partition/default/queue/root.a", strings.NewReader(""))
+ req = req.WithContext(context.WithValue(req.Context(),
httprouter.ParamsKey, httprouter.Params{httprouter.Param{Key: "partition",
Value: "default"}, httprouter.Param{Key: "queue", Value: "root.a"}}))
+ assert.NilError(t, err)
+ resp = &MockResponseWriter{}
+ getPartitionQueue(resp, req)
+ err = json.Unmarshal(resp.outputBytes, &partitionQueueDao2)
+ assert.NilError(t, err, unmarshalError)
+ assert.Equal(t, partitionQueueDao2.QueueName, "root.a")
+ assert.Equal(t, len(partitionQueueDao2.Children), 1)
+ assert.Equal(t, len(partitionQueueDao2.ChildrenNames), 1)
+ assert.Equal(t, partitionQueueDao2.Children[0].QueueName, "root.a.a1")
+ assert.Equal(t, partitionQueueDao2.ChildrenNames[0], "root.a.a1")
+
+ // test partition not exists
+ req, err = http.NewRequest("GET",
"/ws/v1/partition/default/queue/root.a", strings.NewReader(""))
+ req = req.WithContext(context.WithValue(req.Context(),
httprouter.ParamsKey, httprouter.Params{httprouter.Param{Key: "partition",
Value: "notexists"}}))
+ assert.NilError(t, err)
+ resp = &MockResponseWriter{}
+ getPartitionQueue(resp, req)
+ assertPartitionNotExists(t, resp)
+
+ // test params name missing
+ req, err = http.NewRequest("GET",
"/ws/v1/partition/default/queue/root.a", strings.NewReader(""))
+ assert.NilError(t, err)
+ resp = &MockResponseWriter{}
+ getPartitionQueue(resp, req)
+ assertParamsMissing(t, resp)
+
+ // test invalid queue name
+ req, err = http.NewRequest("GET",
"/ws/v1/partition/default/queue/root.a", strings.NewReader(""))
+ req = req.WithContext(context.WithValue(req.Context(),
httprouter.ParamsKey, httprouter.Params{httprouter.Param{Key: "partition",
Value: "default"}, httprouter.Param{Key: "queue", Value: "root.notexists@"}}))
+ assert.NilError(t, err)
+ resp = &MockResponseWriter{}
+ getPartitionQueue(resp, req)
+ assertQueueInvalid(t, resp, "root.notexists@", "notexists@")
+
+ // test queue is not exists
+ req, err = http.NewRequest("GET",
"/ws/v1/partition/default/queue/root.a", strings.NewReader(""))
+ req = req.WithContext(context.WithValue(req.Context(),
httprouter.ParamsKey, httprouter.Params{httprouter.Param{Key: "partition",
Value: "default"}, httprouter.Param{Key: "queue", Value: "notexists"}}))
+ assert.NilError(t, err)
+ resp = &MockResponseWriter{}
+ getPartitionQueue(resp, req)
+ assertQueueNotExists(t, resp)
}
func TestGetClusterInfo(t *testing.T) {
@@ -1485,12 +1545,12 @@ func TestGetPartitionApplicationsByStateHandler(t
*testing.T) {
// test disallow state
checkIllegalGetAppsRequest(t,
"/ws/v1/partition/default/applications/Accepted", httprouter.Params{
httprouter.Param{Key: "partition", Value:
partitionNameWithoutClusterID},
- httprouter.Param{Key: "state", Value: "Accepted"}},
assertAppStateAllow)
+ httprouter.Param{Key: "state", Value: "Accepted"}},
assertAppStateNotAllow)
// test disallow active state
checkIllegalGetAppsRequest(t,
"/ws/v1/partition/default/applications/Active?status=invalid",
httprouter.Params{
httprouter.Param{Key: "partition", Value:
partitionNameWithoutClusterID},
- httprouter.Param{Key: "state", Value: "Active"}},
assertActiveStateAllow)
+ httprouter.Param{Key: "state", Value: "Active"}},
assertActiveStateNotAllow)
// test missing params name
checkIllegalGetAppsRequest(t,
"/ws/v1/partition/default/applications/Active", nil, assertParamsMissing)
@@ -1598,12 +1658,7 @@ func TestGetApplicationHandler(t *testing.T) {
assert.NilError(t, err, "Get Application Handler request failed")
resp5 := &MockResponseWriter{}
getApplication(resp5, req5)
- var errInfo dao.YAPIError
- err = json.Unmarshal(resp5.outputBytes, &errInfo)
- assert.NilError(t, err, unmarshalError)
- assert.Equal(t, http.StatusBadRequest, resp5.statusCode,
statusCodeError)
- assert.Equal(t, errInfo.Message, "problem in queue query parameter
parsing as queue param root.test.test123@ contains invalid queue name test123@.
Queue name must only have alphanumeric characters, - or _, and be no longer
than 64 characters", jsonMessageError)
- assert.Equal(t, errInfo.StatusCode, http.StatusBadRequest)
+ assertQueueInvalid(t, resp5, "root.test.test123@", "test123@")
// test missing params name
req, err = http.NewRequest("GET",
"/ws/v1/partition/default/queue/root.default/application/app-1",
strings.NewReader(""))
@@ -1640,6 +1695,15 @@ func assertQueueNotExists(t *testing.T, resp
*MockResponseWriter) {
assert.Equal(t, errInfo.StatusCode, http.StatusNotFound)
}
+func assertQueueInvalid(t *testing.T, resp *MockResponseWriter,
invalidQueuePath string, invalidQueueName string) {
+ var errInfo dao.YAPIError
+ err := json.Unmarshal(resp.outputBytes, &errInfo)
+ assert.NilError(t, err, unmarshalError)
+ assert.Equal(t, http.StatusBadRequest, resp.statusCode, statusCodeError)
+ assert.Equal(t, errInfo.Message, "problem in queue query parameter
parsing as queue param "+invalidQueuePath+" contains invalid queue name
"+invalidQueueName+". Queue name must only have alphanumeric characters, - or
_, and be no longer than 64 characters", jsonMessageError)
+ assert.Equal(t, errInfo.StatusCode, http.StatusBadRequest)
+}
+
func assertApplicationNotExists(t *testing.T, resp *MockResponseWriter) {
var errInfo dao.YAPIError
err := json.Unmarshal(resp.outputBytes, &errInfo)
@@ -1708,7 +1772,7 @@ func assertNodeIDNotExists(t *testing.T, resp
*MockResponseWriter) {
assert.Equal(t, errInfo.StatusCode, http.StatusNotFound)
}
-func assertAppStateAllow(t *testing.T, resp *MockResponseWriter) {
+func assertAppStateNotAllow(t *testing.T, resp *MockResponseWriter) {
var errInfo dao.YAPIError
err := json.Unmarshal(resp.outputBytes, &errInfo)
assert.NilError(t, err, unmarshalError)
@@ -1717,7 +1781,7 @@ func assertAppStateAllow(t *testing.T, resp
*MockResponseWriter) {
assert.Equal(t, errInfo.StatusCode, http.StatusBadRequest)
}
-func assertActiveStateAllow(t *testing.T, resp *MockResponseWriter) {
+func assertActiveStateNotAllow(t *testing.T, resp *MockResponseWriter) {
var errInfo dao.YAPIError
err := json.Unmarshal(resp.outputBytes, &errInfo)
assert.NilError(t, err, unmarshalError)
diff --git a/pkg/webservice/routes.go b/pkg/webservice/routes.go
index ba549315..f3653b25 100644
--- a/pkg/webservice/routes.go
+++ b/pkg/webservice/routes.go
@@ -98,6 +98,12 @@ var webRoutes = routes{
"/ws/v1/partition/:partition/queues",
getPartitionQueues,
},
+ route{
+ "Scheduler",
+ "GET",
+ "/ws/v1/partition/:partition/queue/:queue",
+ getPartitionQueue,
+ },
route{
"Scheduler",
"GET",
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]