This is an automated email from the ASF dual-hosted git repository.
pierrejeambrun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 440c224af55 AIP-84 Invalid states raises 422 status on get dag_runs
and task_instances endpoints (#44237)
440c224af55 is described below
commit 440c224af5592f9007eef43d1dbe9025aa34e177
Author: Kalyan R <[email protected]>
AuthorDate: Thu Nov 21 15:24:47 2024 +0530
AIP-84 Invalid states raises 422 status on get dag_runs and task_instances
endpoints (#44237)
* invalid state raises 422 for list ti's
* invalid state raises 422 for list dag_runs
* fix tests
---
airflow/api_fastapi/common/parameters.py | 22 +++++++++++++++++-----
.../core_api/routes/public/test_dag_run.py | 7 +++++--
.../core_api/routes/public/test_task_instances.py | 8 ++++++++
3 files changed, 30 insertions(+), 7 deletions(-)
diff --git a/airflow/api_fastapi/common/parameters.py
b/airflow/api_fastapi/common/parameters.py
index 6bfbfadf418..7554ee88450 100644
--- a/airflow/api_fastapi/common/parameters.py
+++ b/airflow/api_fastapi/common/parameters.py
@@ -33,7 +33,7 @@ from typing import (
overload,
)
-from fastapi import Depends, HTTPException, Query
+from fastapi import Depends, HTTPException, Query, status
from pendulum.parsing.exceptions import ParserError
from pydantic import AfterValidator, BaseModel, NonNegativeInt
from sqlalchemy import Column, case, or_
@@ -337,9 +337,15 @@ class
DagRunStateFilter(BaseParam[List[Optional[DagRunState]]]):
@staticmethod
def _convert_dag_run_states(states: Iterable[str] | None) ->
list[DagRunState | None] | None:
- if not states:
- return None
- return [None if s in ("none", None) else DagRunState(s) for s in
states]
+ try:
+ if not states:
+ return None
+ return [None if s in ("none", None) else DagRunState(s) for s in
states]
+ except ValueError:
+ raise HTTPException(
+ status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
+ detail=f"Invalid value for state. Valid values are {',
'.join(DagRunState)}",
+ )
def depends(self, state: list[str] = Query(default_factory=list)) ->
DagRunStateFilter:
states = self._convert_dag_run_states(state)
@@ -360,7 +366,13 @@ class
TIStateFilter(BaseParam[List[Optional[TaskInstanceState]]]):
return select.where(or_(*conditions))
def depends(self, state: list[str] = Query(default_factory=list)) ->
TIStateFilter:
- states = _convert_ti_states(state)
+ try:
+ states = _convert_ti_states(state)
+ except ValueError:
+ raise HTTPException(
+ status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
+ detail=f"Invalid value for state. Valid values are {',
'.join(TaskInstanceState)}",
+ )
return self.set_value(states)
diff --git a/tests/api_fastapi/core_api/routes/public/test_dag_run.py
b/tests/api_fastapi/core_api/routes/public/test_dag_run.py
index 2ac22a02e31..b3ce267bf52 100644
--- a/tests/api_fastapi/core_api/routes/public/test_dag_run.py
+++ b/tests/api_fastapi/core_api/routes/public/test_dag_run.py
@@ -425,8 +425,11 @@ class TestGetDagRuns:
assert body["detail"] == expected_detail
def test_invalid_state(self, test_client):
- with pytest.raises(ValueError, match="'invalid' is not a valid
DagRunState"):
- test_client.get(f"/public/dags/{DAG1_ID}/dagRuns",
params={"state": "invalid"})
+ response = test_client.get(f"/public/dags/{DAG1_ID}/dagRuns",
params={"state": ["invalid"]})
+ assert response.status_code == 422
+ assert (
+ response.json()["detail"] == f"Invalid value for state. Valid
values are {', '.join(DagRunState)}"
+ )
class TestPatchDagRun:
diff --git a/tests/api_fastapi/core_api/routes/public/test_task_instances.py
b/tests/api_fastapi/core_api/routes/public/test_task_instances.py
index f8e75600171..b3b4d0ffb1b 100644
--- a/tests/api_fastapi/core_api/routes/public/test_task_instances.py
+++ b/tests/api_fastapi/core_api/routes/public/test_task_instances.py
@@ -956,6 +956,14 @@ class TestGetTaskInstances(TestTaskInstanceEndpoint):
assert response.status_code == 404
assert response.json() == {"detail": "DagRun with run_id: `invalid`
was not found"}
+ def test_bad_state(self, test_client):
+ response = test_client.get("/public/dags/~/dagRuns/~/taskInstances",
params={"state": "invalid"})
+ assert response.status_code == 422
+ assert (
+ response.json()["detail"]
+ == f"Invalid value for state. Valid values are {',
'.join(TaskInstanceState)}"
+ )
+
@pytest.mark.xfail(reason="permissions not implemented yet.")
def test_return_TI_only_from_readable_dags(self, test_client, session):
task_instances = {