tirkarthi commented on issue #58983:
URL: https://github.com/apache/airflow/issues/58983#issuecomment-3605394996
Looks like an issue with cadwyn and the non coroutine methods being wrapped
as `_CallableWrapper` . Applying the following changes of defining the
functions as async `_AsyncCallableWrapper` is set and tests are passing for
`airflow-core/tests/unit/models/test_mappedoperator.py::test_one_to_many_as_teardown`
```diff
diff --git
a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
index 56111db979..0cc2e45b3a 100644
---
a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
+++
b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
@@ -101,7 +101,7 @@ log = structlog.get_logger(__name__)
},
response_model_exclude_unset=True,
)
-def ti_run(
+async def ti_run(
task_instance_id: UUID,
ti_run_payload: Annotated[TIEnterRunningPayload, Body()],
session: SessionDep,
@@ -341,7 +341,7 @@ def _get_upstream_map_indexes(
HTTP_422_UNPROCESSABLE_CONTENT: {"description": "Invalid payload
for the state transition"},
},
)
-def ti_update_state(
+async def ti_update_state(
task_instance_id: UUID,
ti_patch_payload: Annotated[TIStateUpdate, Body()],
session: SessionDep,
@@ -709,7 +709,7 @@ def ti_heartbeat(
},
},
)
-def ti_put_rtif(
+async def ti_put_rtif(
task_instance_id: UUID,
put_rtif_payload: Annotated[dict[str, JsonValue], Body()],
session: SessionDep,
@@ -739,7 +739,7 @@ def ti_put_rtif(
HTTP_422_UNPROCESSABLE_CONTENT: {"description": "Invalid
rendered_map_index value"},
},
)
-def ti_patch_rendered_map_index(
+async def ti_patch_rendered_map_index(
task_instance_id: UUID,
rendered_map_index: Annotated[str, Body()],
session: SessionDep,
diff --git
a/airflow-core/src/airflow/api_fastapi/execution_api/routes/xcoms.py
b/airflow-core/src/airflow/api_fastapi/execution_api/routes/xcoms.py
index ce8309341f..d31c7c7728 100644
--- a/airflow-core/src/airflow/api_fastapi/execution_api/routes/xcoms.py
+++ b/airflow-core/src/airflow/api_fastapi/execution_api/routes/xcoms.py
@@ -134,7 +134,7 @@ class GetXcomFilterParams(BaseModel):
"/{dag_id}/{run_id}/{task_id}/{key}",
description="Get a single XCom Value",
)
-def get_xcom(
+async def get_xcom(
dag_id: str,
run_id: str,
task_id: str,
@@ -316,7 +316,7 @@ def get_mapped_xcom_by_slice(
"/{dag_id}/{run_id}/{task_id}/{key}",
status_code=status.HTTP_201_CREATED,
)
-def set_xcom(
+async def set_xcom(
dag_id: str,
run_id: str,
task_id: str,
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]