This is an automated email from the ASF dual-hosted git repository.
jason810496 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 6be0310f33e Replaced manual response descriptions with
create_openapi_http_exception_doc for Task Instance routes. This ensures
consistent OpenAPI documentation across the Execution API and removes redundant
hardcoded strings. (#62624)
6be0310f33e is described below
commit 6be0310f33e212c0a6dbcee8663c96dc73ee9697
Author: Henry Chen <[email protected]>
AuthorDate: Wed May 27 20:08:26 2026 +0800
Replaced manual response descriptions with
create_openapi_http_exception_doc for Task Instance routes. This ensures
consistent OpenAPI documentation across the Execution API and removes redundant
hardcoded strings. (#62624)
---
.../api_fastapi/core_api/openapi/exceptions.py | 23 ++++-
.../execution_api/routes/task_instances.py | 106 +++++++++++++--------
.../src/airflow/sdk/api/datamodels/_generated.py | 8 ++
3 files changed, 94 insertions(+), 43 deletions(-)
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/exceptions.py
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/exceptions.py
index 747b057d121..82961eac297 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/exceptions.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/exceptions.py
@@ -17,6 +17,8 @@
from __future__ import annotations
+from collections.abc import Sequence
+
from airflow.api_fastapi.core_api.base import BaseModel
@@ -26,7 +28,9 @@ class HTTPExceptionResponse(BaseModel):
detail: str | dict
-def create_openapi_http_exception_doc(responses_status_code: list[int]) ->
dict:
+def create_openapi_http_exception_doc(
+ responses_status_code: Sequence[int | tuple[int, str]],
+) -> dict:
"""
Will create additional response example for errors raised by the endpoint.
@@ -34,8 +38,21 @@ def create_openapi_http_exception_doc(responses_status_code:
list[int]) -> dict:
raised by the endpoint implementation. This piece of documentation needs
to be kept
in sync with the endpoint code manually.
+ Each item can be either a status code or a ``(status_code, description)``
tuple.
Validation error i.e 422 are natively added to the openapi documentation
by FastAPI.
"""
- responses_status_code = sorted(responses_status_code)
+ openapi_responses: list[tuple[int, dict[str, str |
type[HTTPExceptionResponse]]]] = []
+ for response_status_code in responses_status_code:
+ response: dict[str, str | type[HTTPExceptionResponse]] = {"model":
HTTPExceptionResponse}
+
+ if isinstance(response_status_code, tuple):
+ status_code, description = response_status_code
+ response["description"] = description
+ else:
+ status_code = response_status_code
+
+ openapi_responses.append((status_code, response))
- return {status_code: {"model": HTTPExceptionResponse} for status_code in
responses_status_code}
+ return {
+ status_code: response for status_code, response in
sorted(openapi_responses, key=lambda item: item[0])
+ }
diff --git
a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
index c2d67926a21..a0677c55701 100644
---
a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
+++
b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
@@ -48,6 +48,7 @@ from airflow.api_fastapi.common.dagbag import DagBagDep,
get_latest_version_of_d
from airflow.api_fastapi.common.db.common import SessionDep
from airflow.api_fastapi.common.types import UtcDateTime
from airflow.api_fastapi.compat import HTTP_422_UNPROCESSABLE_CONTENT
+from airflow.api_fastapi.core_api.openapi.exceptions import
create_openapi_http_exception_doc
from airflow.api_fastapi.execution_api.datamodels.taskinstance import (
InactiveAssetsResponse,
PreviousTIResponse,
@@ -105,11 +106,13 @@ tracer = trace.get_tracer(__name__)
"/{task_instance_id}/run",
status_code=status.HTTP_200_OK,
dependencies=[Security(require_auth, scopes=["token:execution",
"token:workload"])],
- responses={
- status.HTTP_404_NOT_FOUND: {"description": "Task Instance not found"},
- status.HTTP_409_CONFLICT: {"description": "The TI is already in the
requested state"},
- HTTP_422_UNPROCESSABLE_CONTENT: {"description": "Invalid payload for
the state transition"},
- },
+ responses=create_openapi_http_exception_doc(
+ [
+ (status.HTTP_404_NOT_FOUND, "Task Instance not found"),
+ (status.HTTP_409_CONFLICT, "The TI is already in the requested
state"),
+ (HTTP_422_UNPROCESSABLE_CONTENT, "Invalid payload for the state
transition"),
+ ]
+ ),
response_model_exclude_unset=True,
)
def ti_run(
@@ -323,11 +326,16 @@ def ti_run(
@ti_id_router.patch(
"/{task_instance_id}/state",
status_code=status.HTTP_204_NO_CONTENT,
- responses={
- status.HTTP_404_NOT_FOUND: {"description": "Task Instance not found"},
- status.HTTP_409_CONFLICT: {"description": "The TI is already in the
requested state"},
- HTTP_422_UNPROCESSABLE_CONTENT: {"description": "Invalid payload for
the state transition"},
- },
+ responses=create_openapi_http_exception_doc(
+ [
+ (status.HTTP_404_NOT_FOUND, "Task Instance not found"),
+ (
+ status.HTTP_409_CONFLICT,
+ "The TI is already in the requested state",
+ ),
+ (HTTP_422_UNPROCESSABLE_CONTENT, "Invalid payload for the state
transition"),
+ ]
+ ),
)
def ti_update_state(
task_instance_id: UUID,
@@ -675,10 +683,12 @@ def _create_ti_state_update_query_and_update_state(
@ti_id_router.patch(
"/{task_instance_id}/skip-downstream",
status_code=status.HTTP_204_NO_CONTENT,
- responses={
- status.HTTP_404_NOT_FOUND: {"description": "Task Instance not found"},
- HTTP_422_UNPROCESSABLE_CONTENT: {"description": "Invalid payload for
the state transition"},
- },
+ responses=create_openapi_http_exception_doc(
+ [
+ (status.HTTP_404_NOT_FOUND, "Task Instance not found"),
+ (HTTP_422_UNPROCESSABLE_CONTENT, "Invalid payload for the state
transition"),
+ ]
+ ),
)
def ti_skip_downstream(
task_instance_id: UUID,
@@ -736,16 +746,20 @@ def ti_skip_downstream(
@ti_id_router.put(
"/{task_instance_id}/heartbeat",
status_code=status.HTTP_204_NO_CONTENT,
- responses={
- status.HTTP_404_NOT_FOUND: {"description": "Task Instance not found"},
- status.HTTP_409_CONFLICT: {
- "description": "The TI attempting to heartbeat should be
terminated for the given reason"
- },
- status.HTTP_410_GONE: {
- "description": "Task Instance not found in the TI table but exists
in the Task Instance History table"
- },
- HTTP_422_UNPROCESSABLE_CONTENT: {"description": "Invalid payload for
the state transition"},
- },
+ responses=create_openapi_http_exception_doc(
+ [
+ (status.HTTP_404_NOT_FOUND, "Task Instance not found"),
+ (
+ status.HTTP_409_CONFLICT,
+ "The TI attempting to heartbeat should be terminated for the
given reason",
+ ),
+ (
+ status.HTTP_410_GONE,
+ "Task Instance not found in the TI table but exists in the
Task Instance History table",
+ ),
+ (HTTP_422_UNPROCESSABLE_CONTENT, "Invalid payload for the state
transition"),
+ ]
+ ),
)
def ti_heartbeat(
task_instance_id: UUID,
@@ -855,12 +869,15 @@ def ti_heartbeat(
description="Store the rendered task instance fields (RTIF) for a task
instance. "
"These are the template fields after Jinja rendering has been applied. "
"Called by the worker after task execution begins.",
- responses={
- status.HTTP_404_NOT_FOUND: {"description": "Task Instance not found"},
- HTTP_422_UNPROCESSABLE_CONTENT: {
- "description": "Invalid payload for the setting rendered task
instance fields"
- },
- },
+ responses=create_openapi_http_exception_doc(
+ [
+ (status.HTTP_404_NOT_FOUND, "Task Instance not found"),
+ (
+ HTTP_422_UNPROCESSABLE_CONTENT,
+ "Invalid payload for the setting rendered task instance
fields",
+ ),
+ ]
+ ),
)
def ti_put_rtif(
task_instance_id: UUID,
@@ -886,10 +903,15 @@ def ti_put_rtif(
@ti_id_router.patch(
"/{task_instance_id}/rendered-map-index",
status_code=status.HTTP_204_NO_CONTENT,
- responses={
- status.HTTP_404_NOT_FOUND: {"description": "Task Instance not found"},
- HTTP_422_UNPROCESSABLE_CONTENT: {"description": "Invalid
rendered_map_index value"},
- },
+ responses=create_openapi_http_exception_doc(
+ [
+ (status.HTTP_404_NOT_FOUND, "Task Instance not found"),
+ (
+ HTTP_422_UNPROCESSABLE_CONTENT,
+ "Invalid rendered_map_index value",
+ ),
+ ]
+ ),
)
def ti_patch_rendered_map_index(
task_instance_id: UUID,
@@ -923,9 +945,11 @@ def ti_patch_rendered_map_index(
@ti_id_router.get(
"/{task_instance_id}/previous-successful-dagrun",
status_code=status.HTTP_200_OK,
- responses={
- status.HTTP_404_NOT_FOUND: {"description": "Task Instance or Dag Run
not found"},
- },
+ responses=create_openapi_http_exception_doc(
+ [
+ (status.HTTP_404_NOT_FOUND, "Task Instance or Dag Run not found"),
+ ]
+ ),
)
def get_previous_successful_dagrun(
task_instance_id: UUID, session: SessionDep
@@ -1189,9 +1213,11 @@ def _get_group_tasks(
@ti_id_router.get(
"/{task_instance_id}/validate-inlets-and-outlets",
status_code=status.HTTP_200_OK,
- responses={
- status.HTTP_404_NOT_FOUND: {"description": "Task Instance not found"},
- },
+ responses=create_openapi_http_exception_doc(
+ [
+ (status.HTTP_404_NOT_FOUND, "Task Instance not found"),
+ ]
+ ),
)
def validate_inlets_and_outlets(
task_instance_id: UUID,
diff --git a/task-sdk/src/airflow/sdk/api/datamodels/_generated.py
b/task-sdk/src/airflow/sdk/api/datamodels/_generated.py
index 62c43ac17d1..2d999111eb0 100644
--- a/task-sdk/src/airflow/sdk/api/datamodels/_generated.py
+++ b/task-sdk/src/airflow/sdk/api/datamodels/_generated.py
@@ -157,6 +157,14 @@ class HITLUser(BaseModel):
name: Annotated[str, Field(title="Name")]
+class HTTPExceptionResponse(BaseModel):
+ """
+ HTTPException Model used for error response.
+ """
+
+ detail: Annotated[str | dict[str, Any], Field(title="Detail")]
+
+
class InactiveAssetsResponse(BaseModel):
"""
Response for inactive assets.