This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 3742b1ea9ee Replace HTTP status code with named constants (#43663)
3742b1ea9ee is described below
commit 3742b1ea9ee82e8bf5cef142232b5cc47faafe89
Author: Kaxil Naik <[email protected]>
AuthorDate: Mon Nov 4 23:15:17 2024 +0000
Replace HTTP status code with named constants (#43663)
---
.../core_api/routes/public/backfills.py | 60 ++++++++++++----
.../core_api/routes/public/connections.py | 22 ++++--
.../api_fastapi/core_api/routes/public/dag_run.py | 51 +++++++++++---
.../core_api/routes/public/dag_sources.py | 16 +++--
.../core_api/routes/public/dag_warning.py | 7 +-
airflow/api_fastapi/core_api/routes/public/dags.py | 80 +++++++++++++++++-----
.../core_api/routes/public/event_logs.py | 8 ++-
.../api_fastapi/core_api/routes/public/pools.py | 54 +++++++++++----
.../core_api/routes/public/task_instances.py | 22 ++++--
.../core_api/routes/public/variables.py | 49 ++++++++++---
airflow/api_fastapi/core_api/routes/ui/assets.py | 6 +-
11 files changed, 289 insertions(+), 86 deletions(-)
diff --git a/airflow/api_fastapi/core_api/routes/public/backfills.py
b/airflow/api_fastapi/core_api/routes/public/backfills.py
index f6fe531d006..e9e93673b8a 100644
--- a/airflow/api_fastapi/core_api/routes/public/backfills.py
+++ b/airflow/api_fastapi/core_api/routes/public/backfills.py
@@ -16,7 +16,7 @@
# under the License.
from __future__ import annotations
-from fastapi import Depends, HTTPException
+from fastapi import Depends, HTTPException, status
from sqlalchemy import select, update
from sqlalchemy.orm import Session
from typing_extensions import Annotated
@@ -47,7 +47,7 @@ backfills_router = AirflowRouter(tags=["Backfill"],
prefix="/backfills")
@backfills_router.get(
path="/",
- responses=create_openapi_http_exception_doc([401, 403]),
+ responses=create_openapi_http_exception_doc([status.HTTP_401_UNAUTHORIZED,
status.HTTP_403_FORBIDDEN]),
)
async def list_backfills(
dag_id: str,
@@ -77,7 +77,9 @@ async def list_backfills(
@backfills_router.get(
path="/{backfill_id}",
- responses=create_openapi_http_exception_doc([401, 403, 404]),
+ responses=create_openapi_http_exception_doc(
+ [status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN,
status.HTTP_404_NOT_FOUND]
+ ),
)
async def get_backfill(
backfill_id: str,
@@ -86,19 +88,26 @@ async def get_backfill(
backfill = session.get(Backfill, backfill_id)
if backfill:
return BackfillResponse.model_validate(backfill, from_attributes=True)
- raise HTTPException(404, "Backfill not found")
+ raise HTTPException(status.HTTP_404_NOT_FOUND, "Backfill not found")
@backfills_router.put(
path="/{backfill_id}/pause",
- responses=create_openapi_http_exception_doc([401, 403, 404, 409]),
+ responses=create_openapi_http_exception_doc(
+ [
+ status.HTTP_401_UNAUTHORIZED,
+ status.HTTP_403_FORBIDDEN,
+ status.HTTP_404_NOT_FOUND,
+ status.HTTP_409_CONFLICT,
+ ]
+ ),
)
async def pause_backfill(*, backfill_id, session: Annotated[Session,
Depends(get_session)]):
b = session.get(Backfill, backfill_id)
if not b:
- raise HTTPException(404, f"Could not find backfill with id
{backfill_id}")
+ raise HTTPException(status.HTTP_404_NOT_FOUND, f"Could not find
backfill with id {backfill_id}")
if b.completed_at:
- raise HTTPException(409, "Backfill is already completed.")
+ raise HTTPException(status.HTTP_409_CONFLICT, "Backfill is already
completed.")
if b.is_paused is False:
b.is_paused = True
session.commit()
@@ -107,14 +116,21 @@ async def pause_backfill(*, backfill_id, session:
Annotated[Session, Depends(get
@backfills_router.put(
path="/{backfill_id}/unpause",
- responses=create_openapi_http_exception_doc([401, 403, 404, 409]),
+ responses=create_openapi_http_exception_doc(
+ [
+ status.HTTP_401_UNAUTHORIZED,
+ status.HTTP_403_FORBIDDEN,
+ status.HTTP_404_NOT_FOUND,
+ status.HTTP_409_CONFLICT,
+ ]
+ ),
)
async def unpause_backfill(*, backfill_id, session: Annotated[Session,
Depends(get_session)]):
b = session.get(Backfill, backfill_id)
if not b:
- raise HTTPException(404, f"Could not find backfill with id
{backfill_id}")
+ raise HTTPException(status.HTTP_404_NOT_FOUND, f"Could not find
backfill with id {backfill_id}")
if b.completed_at:
- raise HTTPException(409, "Backfill is already completed.")
+ raise HTTPException(status.HTTP_409_CONFLICT, "Backfill is already
completed.")
if b.is_paused:
b.is_paused = False
return BackfillResponse.model_validate(b, from_attributes=True)
@@ -122,14 +138,21 @@ async def unpause_backfill(*, backfill_id, session:
Annotated[Session, Depends(g
@backfills_router.put(
path="/{backfill_id}/cancel",
- responses=create_openapi_http_exception_doc([401, 403, 404, 409]),
+ responses=create_openapi_http_exception_doc(
+ [
+ status.HTTP_401_UNAUTHORIZED,
+ status.HTTP_403_FORBIDDEN,
+ status.HTTP_404_NOT_FOUND,
+ status.HTTP_409_CONFLICT,
+ ]
+ ),
)
async def cancel_backfill(*, backfill_id, session: Annotated[Session,
Depends(get_session)]):
b: Backfill = session.get(Backfill, backfill_id)
if not b:
- raise HTTPException(404, f"Could not find backfill with id
{backfill_id}")
+ raise HTTPException(status.HTTP_404_NOT_FOUND, f"Could not find
backfill with id {backfill_id}")
if b.completed_at is not None:
- raise HTTPException(409, "Backfill is already completed.")
+ raise HTTPException(status.HTTP_409_CONFLICT, "Backfill is already
completed.")
# first, pause, and commit immediately to ensure no other dag runs are
started
if not b.is_paused:
@@ -162,7 +185,14 @@ async def cancel_backfill(*, backfill_id, session:
Annotated[Session, Depends(ge
@backfills_router.post(
path="/",
- responses=create_openapi_http_exception_doc([401, 403, 404, 409]),
+ responses=create_openapi_http_exception_doc(
+ [
+ status.HTTP_401_UNAUTHORIZED,
+ status.HTTP_403_FORBIDDEN,
+ status.HTTP_404_NOT_FOUND,
+ status.HTTP_409_CONFLICT,
+ ]
+ ),
)
async def create_backfill(
backfill_request: BackfillPostBody,
@@ -182,6 +212,6 @@ async def create_backfill(
return BackfillResponse.model_validate(backfill_obj,
from_attributes=True)
except AlreadyRunningBackfill:
raise HTTPException(
- status_code=409,
+ status_code=status.HTTP_409_CONFLICT,
detail=f"There is already a running backfill for dag
{backfill_request.dag_id}",
)
diff --git a/airflow/api_fastapi/core_api/routes/public/connections.py
b/airflow/api_fastapi/core_api/routes/public/connections.py
index 60898d2a63a..8d9f9ddb8eb 100644
--- a/airflow/api_fastapi/core_api/routes/public/connections.py
+++ b/airflow/api_fastapi/core_api/routes/public/connections.py
@@ -16,7 +16,7 @@
# under the License.
from __future__ import annotations
-from fastapi import Depends, HTTPException
+from fastapi import Depends, HTTPException, status
from sqlalchemy import select
from sqlalchemy.orm import Session
from typing_extensions import Annotated
@@ -37,7 +37,9 @@ connections_router = AirflowRouter(tags=["Connection"],
prefix="/connections")
@connections_router.delete(
"/{connection_id}",
status_code=204,
- responses=create_openapi_http_exception_doc([401, 403, 404]),
+ responses=create_openapi_http_exception_doc(
+ [status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN,
status.HTTP_404_NOT_FOUND]
+ ),
)
async def delete_connection(
connection_id: str,
@@ -47,14 +49,18 @@ async def delete_connection(
connection =
session.scalar(select(Connection).filter_by(conn_id=connection_id))
if connection is None:
- raise HTTPException(404, f"The Connection with connection_id:
`{connection_id}` was not found")
+ raise HTTPException(
+ status.HTTP_404_NOT_FOUND, f"The Connection with connection_id:
`{connection_id}` was not found"
+ )
session.delete(connection)
@connections_router.get(
"/{connection_id}",
- responses=create_openapi_http_exception_doc([401, 403, 404]),
+ responses=create_openapi_http_exception_doc(
+ [status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN,
status.HTTP_404_NOT_FOUND]
+ ),
)
async def get_connection(
connection_id: str,
@@ -64,14 +70,18 @@ async def get_connection(
connection =
session.scalar(select(Connection).filter_by(conn_id=connection_id))
if connection is None:
- raise HTTPException(404, f"The Connection with connection_id:
`{connection_id}` was not found")
+ raise HTTPException(
+ status.HTTP_404_NOT_FOUND, f"The Connection with connection_id:
`{connection_id}` was not found"
+ )
return ConnectionResponse.model_validate(connection, from_attributes=True)
@connections_router.get(
"/",
- responses=create_openapi_http_exception_doc([401, 403, 404]),
+ responses=create_openapi_http_exception_doc(
+ [status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN,
status.HTTP_404_NOT_FOUND]
+ ),
)
async def get_connections(
limit: QueryLimit,
diff --git a/airflow/api_fastapi/core_api/routes/public/dag_run.py
b/airflow/api_fastapi/core_api/routes/public/dag_run.py
index 02780d6088e..7f41573b1db 100644
--- a/airflow/api_fastapi/core_api/routes/public/dag_run.py
+++ b/airflow/api_fastapi/core_api/routes/public/dag_run.py
@@ -17,7 +17,7 @@
from __future__ import annotations
-from fastapi import Depends, HTTPException, Query, Request
+from fastapi import Depends, HTTPException, Query, Request, status
from sqlalchemy import select
from sqlalchemy.orm import Session
from typing_extensions import Annotated
@@ -40,21 +40,40 @@ from airflow.models import DAG, DagRun
dag_run_router = AirflowRouter(tags=["DagRun"],
prefix="/dags/{dag_id}/dagRuns")
-@dag_run_router.get("/{dag_run_id}",
responses=create_openapi_http_exception_doc([401, 403, 404]))
+@dag_run_router.get(
+ "/{dag_run_id}",
+ responses=create_openapi_http_exception_doc(
+ [
+ status.HTTP_401_UNAUTHORIZED,
+ status.HTTP_403_FORBIDDEN,
+ status.HTTP_404_NOT_FOUND,
+ ]
+ ),
+)
async def get_dag_run(
dag_id: str, dag_run_id: str, session: Annotated[Session,
Depends(get_session)]
) -> DAGRunResponse:
dag_run = session.scalar(select(DagRun).filter_by(dag_id=dag_id,
run_id=dag_run_id))
if dag_run is None:
raise HTTPException(
- 404, f"The DagRun with dag_id: `{dag_id}` and run_id:
`{dag_run_id}` was not found"
+ status.HTTP_404_NOT_FOUND,
+ f"The DagRun with dag_id: `{dag_id}` and run_id: `{dag_run_id}`
was not found",
)
return DAGRunResponse.model_validate(dag_run, from_attributes=True)
@dag_run_router.delete(
- "/{dag_run_id}", status_code=204,
responses=create_openapi_http_exception_doc([400, 401, 403, 404])
+ "/{dag_run_id}",
+ status_code=status.HTTP_204_NO_CONTENT,
+ responses=create_openapi_http_exception_doc(
+ [
+ status.HTTP_400_BAD_REQUEST,
+ status.HTTP_401_UNAUTHORIZED,
+ status.HTTP_403_FORBIDDEN,
+ status.HTTP_404_NOT_FOUND,
+ ]
+ ),
)
async def delete_dag_run(dag_id: str, dag_run_id: str, session:
Annotated[Session, Depends(get_session)]):
"""Delete a DAG Run entry."""
@@ -62,13 +81,24 @@ async def delete_dag_run(dag_id: str, dag_run_id: str,
session: Annotated[Sessio
if dag_run is None:
raise HTTPException(
- 404, f"The DagRun with dag_id: `{dag_id}` and run_id:
`{dag_run_id}` was not found"
+ status.HTTP_404_NOT_FOUND,
+ f"The DagRun with dag_id: `{dag_id}` and run_id: `{dag_run_id}`
was not found",
)
session.delete(dag_run)
-@dag_run_router.patch("/{dag_run_id}",
responses=create_openapi_http_exception_doc([400, 401, 403, 404]))
+@dag_run_router.patch(
+ "/{dag_run_id}",
+ responses=create_openapi_http_exception_doc(
+ [
+ status.HTTP_400_BAD_REQUEST,
+ status.HTTP_401_UNAUTHORIZED,
+ status.HTTP_403_FORBIDDEN,
+ status.HTTP_404_NOT_FOUND,
+ ]
+ ),
+)
async def patch_dag_run_state(
dag_id: str,
dag_run_id: str,
@@ -81,17 +111,20 @@ async def patch_dag_run_state(
dag_run = session.scalar(select(DagRun).filter_by(dag_id=dag_id,
run_id=dag_run_id))
if dag_run is None:
raise HTTPException(
- 404, f"The DagRun with dag_id: `{dag_id}` and run_id:
`{dag_run_id}` was not found"
+ status.HTTP_404_NOT_FOUND,
+ f"The DagRun with dag_id: `{dag_id}` and run_id: `{dag_run_id}`
was not found",
)
dag: DAG = request.app.state.dag_bag.get_dag(dag_id)
if not dag:
- raise HTTPException(404, f"Dag with id {dag_id} was not found")
+ raise HTTPException(status.HTTP_404_NOT_FOUND, f"Dag with id {dag_id}
was not found")
if update_mask:
if update_mask != ["state"]:
- raise HTTPException(400, "Only `state` field can be updated
through the REST API")
+ raise HTTPException(
+ status.HTTP_400_BAD_REQUEST, "Only `state` field can be
updated through the REST API"
+ )
else:
update_mask = ["state"]
diff --git a/airflow/api_fastapi/core_api/routes/public/dag_sources.py
b/airflow/api_fastapi/core_api/routes/public/dag_sources.py
index 2a660a7d026..3cf046f5b75 100644
--- a/airflow/api_fastapi/core_api/routes/public/dag_sources.py
+++ b/airflow/api_fastapi/core_api/routes/public/dag_sources.py
@@ -16,7 +16,7 @@
# under the License.
from __future__ import annotations
-from fastapi import Depends, Header, HTTPException, Request, Response
+from fastapi import Depends, Header, HTTPException, Request, Response, status
from itsdangerous import BadSignature, URLSafeSerializer
from sqlalchemy.orm import Session
from typing_extensions import Annotated
@@ -37,7 +37,15 @@ mime_type_any = "*/*"
@dag_sources_router.get(
"/{file_token}",
responses={
- **create_openapi_http_exception_doc([400, 401, 403, 404, 406]),
+ **create_openapi_http_exception_doc(
+ [
+ status.HTTP_400_BAD_REQUEST,
+ status.HTTP_401_UNAUTHORIZED,
+ status.HTTP_403_FORBIDDEN,
+ status.HTTP_404_NOT_FOUND,
+ status.HTTP_406_NOT_ACCEPTABLE,
+ ]
+ ),
"200": {
"description": "Successful Response",
"content": {
@@ -62,10 +70,10 @@ async def get_dag_source(
content=DagCode.code(path, session=session),
)
except (BadSignature, FileNotFoundError):
- raise HTTPException(404, "DAG source not found")
+ raise HTTPException(status.HTTP_404_NOT_FOUND, "DAG source not found")
if accept.startswith(mime_type_text):
return Response(dag_source_model.content, media_type=mime_type_text)
if accept.startswith(mime_type_json) or accept.startswith(mime_type_any):
return dag_source_model
- raise HTTPException(406, "Content not available for Accept header")
+ raise HTTPException(status.HTTP_406_NOT_ACCEPTABLE, "Content not available
for Accept header")
diff --git a/airflow/api_fastapi/core_api/routes/public/dag_warning.py
b/airflow/api_fastapi/core_api/routes/public/dag_warning.py
index a388fae13be..f445fb0afcf 100644
--- a/airflow/api_fastapi/core_api/routes/public/dag_warning.py
+++ b/airflow/api_fastapi/core_api/routes/public/dag_warning.py
@@ -17,7 +17,7 @@
from __future__ import annotations
-from fastapi import Depends
+from fastapi import Depends, status
from sqlalchemy import select
from sqlalchemy.orm import Session
from typing_extensions import Annotated
@@ -44,7 +44,10 @@ from airflow.models import DagWarning
dag_warning_router = AirflowRouter(tags=["DagWarning"])
-@dag_warning_router.get("/dagWarnings",
responses=create_openapi_http_exception_doc([401, 403]))
+@dag_warning_router.get(
+ "/dagWarnings",
+ responses=create_openapi_http_exception_doc([status.HTTP_401_UNAUTHORIZED,
status.HTTP_403_FORBIDDEN]),
+)
async def list_dag_warnings(
dag_id: QueryDagIdInDagWarningFilter,
warning_type: QueryWarningTypeFilter,
diff --git a/airflow/api_fastapi/core_api/routes/public/dags.py
b/airflow/api_fastapi/core_api/routes/public/dags.py
index c7b753b5cdb..7d2ee68fa14 100644
--- a/airflow/api_fastapi/core_api/routes/public/dags.py
+++ b/airflow/api_fastapi/core_api/routes/public/dags.py
@@ -17,7 +17,7 @@
from __future__ import annotations
-from fastapi import Depends, HTTPException, Query, Request, Response
+from fastapi import Depends, HTTPException, Query, Request, Response, status
from sqlalchemy import select, update
from sqlalchemy.orm import Session
from typing_extensions import Annotated
@@ -99,7 +99,7 @@ async def get_dags(
@dags_router.get(
"/tags",
- responses=create_openapi_http_exception_doc([401, 403]),
+ responses=create_openapi_http_exception_doc([status.HTTP_401_UNAUTHORIZED,
status.HTTP_403_FORBIDDEN]),
)
async def get_dag_tags(
limit: QueryLimit,
@@ -130,18 +130,29 @@ async def get_dag_tags(
return DAGTagCollectionResponse(tags=[dag_tag for dag_tag in dag_tags],
total_entries=total_entries)
-@dags_router.get("/{dag_id}",
responses=create_openapi_http_exception_doc([400, 401, 403, 404, 422]))
+@dags_router.get(
+ "/{dag_id}",
+ responses=create_openapi_http_exception_doc(
+ [
+ status.HTTP_400_BAD_REQUEST,
+ status.HTTP_401_UNAUTHORIZED,
+ status.HTTP_403_FORBIDDEN,
+ status.HTTP_404_NOT_FOUND,
+ status.HTTP_422_UNPROCESSABLE_ENTITY,
+ ]
+ ),
+)
async def get_dag(
dag_id: str, session: Annotated[Session, Depends(get_session)], request:
Request
) -> DAGResponse:
"""Get basic information about a DAG."""
dag: DAG = request.app.state.dag_bag.get_dag(dag_id)
if not dag:
- raise HTTPException(404, f"Dag with id {dag_id} was not found")
+ raise HTTPException(status.HTTP_404_NOT_FOUND, f"Dag with id {dag_id}
was not found")
dag_model: DagModel = session.get(DagModel, dag_id)
if not dag_model:
- raise HTTPException(404, f"Unable to obtain dag with id {dag_id} from
session")
+ raise HTTPException(status.HTTP_404_NOT_FOUND, f"Unable to obtain dag
with id {dag_id} from session")
for key, value in dag.__dict__.items():
if not key.startswith("_") and not hasattr(dag_model, key):
@@ -157,11 +168,11 @@ async def get_dag_details(
"""Get details of DAG."""
dag: DAG = request.app.state.dag_bag.get_dag(dag_id)
if not dag:
- raise HTTPException(404, f"Dag with id {dag_id} was not found")
+ raise HTTPException(status.HTTP_404_NOT_FOUND, f"Dag with id {dag_id}
was not found")
dag_model: DagModel = session.get(DagModel, dag_id)
if not dag_model:
- raise HTTPException(404, f"Unable to obtain dag with id {dag_id} from
session")
+ raise HTTPException(status.HTTP_404_NOT_FOUND, f"Unable to obtain dag
with id {dag_id} from session")
for key, value in dag.__dict__.items():
if not key.startswith("_") and not hasattr(dag_model, key):
@@ -170,7 +181,17 @@ async def get_dag_details(
return DAGDetailsResponse.model_validate(dag_model, from_attributes=True)
-@dags_router.patch("/{dag_id}",
responses=create_openapi_http_exception_doc([400, 401, 403, 404]))
+@dags_router.patch(
+ "/{dag_id}",
+ responses=create_openapi_http_exception_doc(
+ [
+ status.HTTP_400_BAD_REQUEST,
+ status.HTTP_401_UNAUTHORIZED,
+ status.HTTP_403_FORBIDDEN,
+ status.HTTP_404_NOT_FOUND,
+ ]
+ ),
+)
async def patch_dag(
dag_id: str,
patch_body: DAGPatchBody,
@@ -181,11 +202,13 @@ async def patch_dag(
dag = session.get(DagModel, dag_id)
if dag is None:
- raise HTTPException(404, f"Dag with id: {dag_id} was not found")
+ raise HTTPException(status.HTTP_404_NOT_FOUND, f"Dag with id: {dag_id}
was not found")
if update_mask:
if update_mask != ["is_paused"]:
- raise HTTPException(400, "Only `is_paused` field can be updated
through the REST API")
+ raise HTTPException(
+ status.HTTP_400_BAD_REQUEST, "Only `is_paused` field can be
updated through the REST API"
+ )
else:
update_mask = ["is_paused"]
@@ -197,7 +220,17 @@ async def patch_dag(
return DAGResponse.model_validate(dag, from_attributes=True)
-@dags_router.patch("/", responses=create_openapi_http_exception_doc([400, 401,
403, 404]))
+@dags_router.patch(
+ "/",
+ responses=create_openapi_http_exception_doc(
+ [
+ status.HTTP_400_BAD_REQUEST,
+ status.HTTP_401_UNAUTHORIZED,
+ status.HTTP_403_FORBIDDEN,
+ status.HTTP_404_NOT_FOUND,
+ ]
+ ),
+)
async def patch_dags(
patch_body: DAGPatchBody,
limit: QueryLimit,
@@ -214,7 +247,9 @@ async def patch_dags(
"""Patch multiple DAGs."""
if update_mask:
if update_mask != ["is_paused"]:
- raise HTTPException(400, "Only `is_paused` field can be updated
through the REST API")
+ raise HTTPException(
+ status.HTTP_400_BAD_REQUEST, "Only `is_paused` field can be
updated through the REST API"
+ )
else:
update_mask = ["is_paused"]
@@ -244,7 +279,18 @@ async def patch_dags(
)
-@dags_router.delete("/{dag_id}",
responses=create_openapi_http_exception_doc([400, 401, 403, 404, 422]))
+@dags_router.delete(
+ "/{dag_id}",
+ responses=create_openapi_http_exception_doc(
+ [
+ status.HTTP_400_BAD_REQUEST,
+ status.HTTP_401_UNAUTHORIZED,
+ status.HTTP_403_FORBIDDEN,
+ status.HTTP_404_NOT_FOUND,
+ status.HTTP_422_UNPROCESSABLE_ENTITY,
+ ]
+ ),
+)
async def delete_dag(
dag_id: str,
session: Annotated[Session, Depends(get_session)],
@@ -253,7 +299,9 @@ async def delete_dag(
try:
delete_dag_module.delete_dag(dag_id, session=session)
except DagNotFound:
- raise HTTPException(404, f"Dag with id: {dag_id} was not found")
+ raise HTTPException(status.HTTP_404_NOT_FOUND, f"Dag with id: {dag_id}
was not found")
except AirflowException:
- raise HTTPException(409, f"Task instances of dag with id: '{dag_id}'
are still running")
- return Response(status_code=204)
+ raise HTTPException(
+ status.HTTP_409_CONFLICT, f"Task instances of dag with id:
'{dag_id}' are still running"
+ )
+ return Response(status_code=status.HTTP_204_NO_CONTENT)
diff --git a/airflow/api_fastapi/core_api/routes/public/event_logs.py
b/airflow/api_fastapi/core_api/routes/public/event_logs.py
index 75f12cbefb0..537bb5ffe4d 100644
--- a/airflow/api_fastapi/core_api/routes/public/event_logs.py
+++ b/airflow/api_fastapi/core_api/routes/public/event_logs.py
@@ -16,7 +16,7 @@
# under the License.
from __future__ import annotations
-from fastapi import Depends, HTTPException
+from fastapi import Depends, HTTPException, status
from sqlalchemy import select
from sqlalchemy.orm import Session
from typing_extensions import Annotated
@@ -36,7 +36,9 @@ event_logs_router = AirflowRouter(tags=["Event Log"],
prefix="/eventLogs")
@event_logs_router.get(
"/{event_log_id}",
- responses=create_openapi_http_exception_doc([401, 403, 404]),
+ responses=create_openapi_http_exception_doc(
+ [status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN,
status.HTTP_404_NOT_FOUND]
+ ),
)
async def get_event_log(
event_log_id: int,
@@ -44,7 +46,7 @@ async def get_event_log(
) -> EventLogResponse:
event_log = session.scalar(select(Log).where(Log.id == event_log_id))
if event_log is None:
- raise HTTPException(404, f"The Event Log with id: `{event_log_id}` not
found")
+ raise HTTPException(status.HTTP_404_NOT_FOUND, f"The Event Log with
id: `{event_log_id}` not found")
return EventLogResponse.model_validate(
event_log,
from_attributes=True,
diff --git a/airflow/api_fastapi/core_api/routes/public/pools.py
b/airflow/api_fastapi/core_api/routes/public/pools.py
index 5690196e850..99389e0bd6e 100644
--- a/airflow/api_fastapi/core_api/routes/public/pools.py
+++ b/airflow/api_fastapi/core_api/routes/public/pools.py
@@ -16,7 +16,7 @@
# under the License.
from __future__ import annotations
-from fastapi import Depends, HTTPException, Query
+from fastapi import Depends, HTTPException, Query, status
from fastapi.exceptions import RequestValidationError
from pydantic import ValidationError
from sqlalchemy import delete, select
@@ -41,8 +41,15 @@ pools_router = AirflowRouter(tags=["Pool"], prefix="/pools")
@pools_router.delete(
"/{pool_name}",
- status_code=204,
- responses=create_openapi_http_exception_doc([400, 401, 403, 404]),
+ status_code=status.HTTP_204_NO_CONTENT,
+ responses=create_openapi_http_exception_doc(
+ [
+ status.HTTP_400_BAD_REQUEST,
+ status.HTTP_401_UNAUTHORIZED,
+ status.HTTP_403_FORBIDDEN,
+ status.HTTP_404_NOT_FOUND,
+ ]
+ ),
)
async def delete_pool(
pool_name: str,
@@ -50,17 +57,19 @@ async def delete_pool(
):
"""Delete a pool entry."""
if pool_name == "default_pool":
- raise HTTPException(400, "Default Pool can't be deleted")
+ raise HTTPException(status.HTTP_400_BAD_REQUEST, "Default Pool can't
be deleted")
affected_count = session.execute(delete(Pool).where(Pool.pool ==
pool_name)).rowcount
if affected_count == 0:
- raise HTTPException(404, f"The Pool with name: `{pool_name}` was not
found")
+ raise HTTPException(status.HTTP_404_NOT_FOUND, f"The Pool with name:
`{pool_name}` was not found")
@pools_router.get(
"/{pool_name}",
- responses=create_openapi_http_exception_doc([401, 403, 404]),
+ responses=create_openapi_http_exception_doc(
+ [status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN,
status.HTTP_404_NOT_FOUND]
+ ),
)
async def get_pool(
pool_name: str,
@@ -69,14 +78,16 @@ async def get_pool(
"""Get a pool."""
pool = session.scalar(select(Pool).where(Pool.pool == pool_name))
if pool is None:
- raise HTTPException(404, f"The Pool with name: `{pool_name}` was not
found")
+ raise HTTPException(status.HTTP_404_NOT_FOUND, f"The Pool with name:
`{pool_name}` was not found")
return PoolResponse.model_validate(pool, from_attributes=True)
@pools_router.get(
"/",
- responses=create_openapi_http_exception_doc([401, 403, 404]),
+ responses=create_openapi_http_exception_doc(
+ [status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN,
status.HTTP_404_NOT_FOUND]
+ ),
)
async def get_pools(
limit: QueryLimit,
@@ -105,7 +116,17 @@ async def get_pools(
)
-@pools_router.patch("/{pool_name}",
responses=create_openapi_http_exception_doc([400, 401, 403, 404]))
+@pools_router.patch(
+ "/{pool_name}",
+ responses=create_openapi_http_exception_doc(
+ [
+ status.HTTP_400_BAD_REQUEST,
+ status.HTTP_401_UNAUTHORIZED,
+ status.HTTP_403_FORBIDDEN,
+ status.HTTP_404_NOT_FOUND,
+ ]
+ ),
+)
async def patch_pool(
pool_name: str,
patch_body: PoolPatchBody,
@@ -118,11 +139,16 @@ async def patch_pool(
if update_mask and all(mask.strip() in {"slots", "include_deferred"}
for mask in update_mask):
pass
else:
- raise HTTPException(400, "Only slots and included_deferred can be
modified on Default Pool")
+ raise HTTPException(
+ status.HTTP_400_BAD_REQUEST,
+ "Only slots and included_deferred can be modified on Default
Pool",
+ )
pool = session.scalar(select(Pool).where(Pool.pool == pool_name).limit(1))
if not pool:
- raise HTTPException(404, detail=f"The Pool with name: `{pool_name}`
was not found")
+ raise HTTPException(
+ status.HTTP_404_NOT_FOUND, detail=f"The Pool with name:
`{pool_name}` was not found"
+ )
if update_mask:
data = patch_body.model_dump(include=set(update_mask), by_alias=True)
@@ -139,7 +165,11 @@ async def patch_pool(
return PoolResponse.model_validate(pool, from_attributes=True)
-@pools_router.post("/", status_code=201,
responses=create_openapi_http_exception_doc([401, 403]))
+@pools_router.post(
+ "/",
+ status_code=status.HTTP_201_CREATED,
+ responses=create_openapi_http_exception_doc([status.HTTP_401_UNAUTHORIZED,
status.HTTP_403_FORBIDDEN]),
+)
async def post_pool(
post_body: PoolPostBody,
session: Annotated[Session, Depends(get_session)],
diff --git a/airflow/api_fastapi/core_api/routes/public/task_instances.py
b/airflow/api_fastapi/core_api/routes/public/task_instances.py
index c9458e843af..df16c0bc450 100644
--- a/airflow/api_fastapi/core_api/routes/public/task_instances.py
+++ b/airflow/api_fastapi/core_api/routes/public/task_instances.py
@@ -17,7 +17,7 @@
from __future__ import annotations
-from fastapi import Depends, HTTPException
+from fastapi import Depends, HTTPException, status
from sqlalchemy.orm import Session, joinedload
from sqlalchemy.sql import select
from typing_extensions import Annotated
@@ -33,7 +33,12 @@ task_instances_router = AirflowRouter(
)
-@task_instances_router.get("/{task_id}",
responses=create_openapi_http_exception_doc([401, 403, 404]))
+@task_instances_router.get(
+ "/{task_id}",
+ responses=create_openapi_http_exception_doc(
+ [status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN,
status.HTTP_404_NOT_FOUND]
+ ),
+)
async def get_task_instance(
dag_id: str, dag_run_id: str, task_id: str, session: Annotated[Session,
Depends(get_session)]
) -> TaskInstanceResponse:
@@ -48,17 +53,22 @@ async def get_task_instance(
if task_instance is None:
raise HTTPException(
- 404,
+ status.HTTP_404_NOT_FOUND,
f"The Task Instance with dag_id: `{dag_id}`, run_id:
`{dag_run_id}` and task_id: `{task_id}` was not found",
)
if task_instance.map_index != -1:
- raise HTTPException(404, "Task instance is mapped, add the map_index
value to the URL")
+ raise HTTPException(
+ status.HTTP_404_NOT_FOUND, "Task instance is mapped, add the
map_index value to the URL"
+ )
return TaskInstanceResponse.model_validate(task_instance,
from_attributes=True)
@task_instances_router.get(
- "/{task_id}/{map_index}",
responses=create_openapi_http_exception_doc([401, 403, 404])
+ "/{task_id}/{map_index}",
+ responses=create_openapi_http_exception_doc(
+ [status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN,
status.HTTP_404_NOT_FOUND]
+ ),
)
async def get_mapped_task_instance(
dag_id: str,
@@ -78,7 +88,7 @@ async def get_mapped_task_instance(
if task_instance is None:
raise HTTPException(
- 404,
+ status.HTTP_404_NOT_FOUND,
f"The Mapped Task Instance with dag_id: `{dag_id}`, run_id:
`{dag_run_id}`, task_id: `{task_id}`, and map_index: `{map_index}` was not
found",
)
diff --git a/airflow/api_fastapi/core_api/routes/public/variables.py
b/airflow/api_fastapi/core_api/routes/public/variables.py
index 6b834a6de75..5d2bf5a899d 100644
--- a/airflow/api_fastapi/core_api/routes/public/variables.py
+++ b/airflow/api_fastapi/core_api/routes/public/variables.py
@@ -16,7 +16,7 @@
# under the License.
from __future__ import annotations
-from fastapi import Depends, HTTPException, Query
+from fastapi import Depends, HTTPException, Query, status
from sqlalchemy import select
from sqlalchemy.orm import Session
from typing_extensions import Annotated
@@ -38,7 +38,9 @@ variables_router = AirflowRouter(tags=["Variable"],
prefix="/variables")
@variables_router.delete(
"/{variable_key}",
status_code=204,
- responses=create_openapi_http_exception_doc([401, 403, 404]),
+ responses=create_openapi_http_exception_doc(
+ [status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN,
status.HTTP_404_NOT_FOUND]
+ ),
)
async def delete_variable(
variable_key: str,
@@ -46,10 +48,17 @@ async def delete_variable(
):
"""Delete a variable entry."""
if Variable.delete(variable_key, session) == 0:
- raise HTTPException(404, f"The Variable with key: `{variable_key}` was
not found")
+ raise HTTPException(
+ status.HTTP_404_NOT_FOUND, f"The Variable with key:
`{variable_key}` was not found"
+ )
-@variables_router.get("/{variable_key}",
responses=create_openapi_http_exception_doc([401, 403, 404]))
+@variables_router.get(
+ "/{variable_key}",
+ responses=create_openapi_http_exception_doc(
+ [status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN,
status.HTTP_404_NOT_FOUND]
+ ),
+)
async def get_variable(
variable_key: str,
session: Annotated[Session, Depends(get_session)],
@@ -58,14 +67,16 @@ async def get_variable(
variable = session.scalar(select(Variable).where(Variable.key ==
variable_key).limit(1))
if variable is None:
- raise HTTPException(404, f"The Variable with key: `{variable_key}` was
not found")
+ raise HTTPException(
+ status.HTTP_404_NOT_FOUND, f"The Variable with key:
`{variable_key}` was not found"
+ )
return VariableResponse.model_validate(variable, from_attributes=True)
@variables_router.get(
"/",
- responses=create_openapi_http_exception_doc([401, 403]),
+ responses=create_openapi_http_exception_doc([status.HTTP_401_UNAUTHORIZED,
status.HTTP_403_FORBIDDEN]),
)
async def get_variables(
limit: QueryLimit,
@@ -99,7 +110,17 @@ async def get_variables(
)
-@variables_router.patch("/{variable_key}",
responses=create_openapi_http_exception_doc([400, 401, 403, 404]))
+@variables_router.patch(
+ "/{variable_key}",
+ responses=create_openapi_http_exception_doc(
+ [
+ status.HTTP_400_BAD_REQUEST,
+ status.HTTP_401_UNAUTHORIZED,
+ status.HTTP_403_FORBIDDEN,
+ status.HTTP_404_NOT_FOUND,
+ ]
+ ),
+)
async def patch_variable(
variable_key: str,
patch_body: VariableBody,
@@ -108,11 +129,15 @@ async def patch_variable(
) -> VariableResponse:
"""Update a variable by key."""
if patch_body.key != variable_key:
- raise HTTPException(400, "Invalid body, key from request body doesn't
match uri parameter")
+ raise HTTPException(
+ status.HTTP_400_BAD_REQUEST, "Invalid body, key from request body
doesn't match uri parameter"
+ )
non_update_fields = {"key"}
variable =
session.scalar(select(Variable).filter_by(key=variable_key).limit(1))
if not variable:
- raise HTTPException(404, f"The Variable with key: `{variable_key}` was
not found")
+ raise HTTPException(
+ status.HTTP_404_NOT_FOUND, f"The Variable with key:
`{variable_key}` was not found"
+ )
if update_mask:
data = patch_body.model_dump(include=set(update_mask) -
non_update_fields)
else:
@@ -122,7 +147,11 @@ async def patch_variable(
return variable
-@variables_router.post("/", status_code=201,
responses=create_openapi_http_exception_doc([401, 403]))
+@variables_router.post(
+ "/",
+ status_code=status.HTTP_201_CREATED,
+ responses=create_openapi_http_exception_doc([status.HTTP_401_UNAUTHORIZED,
status.HTTP_403_FORBIDDEN]),
+)
async def post_variable(
post_body: VariableBody,
session: Annotated[Session, Depends(get_session)],
diff --git a/airflow/api_fastapi/core_api/routes/ui/assets.py
b/airflow/api_fastapi/core_api/routes/ui/assets.py
index 6786bc30ae6..3b98e4f59a3 100644
--- a/airflow/api_fastapi/core_api/routes/ui/assets.py
+++ b/airflow/api_fastapi/core_api/routes/ui/assets.py
@@ -17,7 +17,7 @@
from __future__ import annotations
-from fastapi import Depends, HTTPException, Request
+from fastapi import Depends, HTTPException, Request, status
from sqlalchemy import and_, func, select
from sqlalchemy.orm import Session
from typing_extensions import Annotated
@@ -39,12 +39,12 @@ async def next_run_assets(
dag = request.app.state.dag_bag.get_dag(dag_id)
if not dag:
- raise HTTPException(404, f"can't find dag {dag_id}")
+ raise HTTPException(status.HTTP_404_NOT_FOUND, f"can't find dag
{dag_id}")
dag_model = DagModel.get_dagmodel(dag_id, session=session)
if dag_model is None:
- raise HTTPException(404, f"can't find associated dag_model {dag_id}")
+ raise HTTPException(status.HTTP_404_NOT_FOUND, f"can't find associated
dag_model {dag_id}")
latest_run = dag_model.get_last_dagrun(session=session)