This is an automated email from the ASF dual-hosted git repository.
sbp pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tooling-trusted-release.git
The following commit(s) were added to refs/heads/main by this push:
new 215e26f Add ability to select task status in the API
215e26f is described below
commit 215e26f2b407e2676a705377912cc5cc82821f29
Author: Sean B. Palmer <[email protected]>
AuthorDate: Wed Jul 2 18:50:26 2025 +0100
Add ability to select task status in the API
---
atr/blueprints/api/api.py | 26 +++++++++++++++++---------
1 file changed, 17 insertions(+), 9 deletions(-)
diff --git a/atr/blueprints/api/api.py b/atr/blueprints/api/api.py
index b2d775b..311f172 100644
--- a/atr/blueprints/api/api.py
+++ b/atr/blueprints/api/api.py
@@ -41,6 +41,11 @@ class Pagination:
limit: int = 20
[email protected]
+class Task(Pagination):
+ status: str | None = None
+
+
# We implicitly have /api/openapi.json
@@ -79,19 +84,22 @@ async def projects_name_releases(name: str) ->
tuple[list[Mapping], int]:
@api.BLUEPRINT.route("/tasks")
-@quart_schema.validate_querystring(Pagination)
-async def tasks(query_args: Pagination) -> quart.Response:
+@quart_schema.validate_querystring(Task)
+async def tasks(query_args: Task) -> quart.Response:
_pagination_args_validate(query_args)
via = models.validate_instrumented_attribute
async with db.session() as data:
- statement = (
- sqlmodel.select(models.Task)
- .limit(query_args.limit)
- .offset(query_args.offset)
- .order_by(via(models.Task.id).desc())
- )
+ statement =
sqlmodel.select(models.Task).limit(query_args.limit).offset(query_args.offset)
+ if query_args.status:
+ if query_args.status not in models.TaskStatus:
+ raise exceptions.BadRequest(f"Invalid status:
{query_args.status}")
+ statement = statement.where(models.Task.status ==
query_args.status)
+ statement = statement.order_by(via(models.Task.id).desc())
paged_tasks = (await data.execute(statement)).scalars().all()
- count = (await
data.execute(sqlalchemy.select(sqlalchemy.func.count(via(models.Task.id))))).scalar_one()
+ count_statement =
sqlalchemy.select(sqlalchemy.func.count(via(models.Task.id)))
+ if query_args.status:
+ count_statement = count_statement.where(via(models.Task.status) ==
query_args.status)
+ count = (await data.execute(count_statement)).scalar_one()
result = {"data": [paged_task.model_dump(exclude={"result"}) for
paged_task in paged_tasks], "count": count}
return quart.jsonify(result)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]