dabla commented on code in PR #55289:
URL: https://github.com/apache/airflow/pull/55289#discussion_r2493413591
##########
airflow-core/src/airflow/api_fastapi/execution_api/routes/xcoms.py:
##########
@@ -385,39 +385,20 @@ def set_xcom(
# TODO: Can/should we check if a client _hasn't_ provided this for an
upstream of a mapped task? That
# means loading the serialized dag and that seems like a relatively costly
operation for minimal benefit
# (the mapped task would fail in a moment as it can't be expanded anyway.)
- from airflow.models.dagrun import DagRun
-
- if not run_id:
- raise HTTPException(status.HTTP_404_NOT_FOUND, f"Run with ID:
`{run_id}` was not found")
-
- dag_run_id = session.query(DagRun.id).filter_by(dag_id=dag_id,
run_id=run_id).scalar()
- if dag_run_id is None:
- raise HTTPException(status.HTTP_404_NOT_FOUND, f"DAG run not found on
DAG {dag_id} with ID {run_id}")
-
- # Remove duplicate XComs and insert a new one.
- session.execute(
- delete(XComModel).where(
- XComModel.key == key,
- XComModel.run_id == run_id,
- XComModel.task_id == task_id,
- XComModel.dag_id == dag_id,
- XComModel.map_index == map_index,
- )
- )
-
try:
# We expect serialised value from the caller - sdk, do not serialise
in here
- new = XComModel(
- dag_run_id=dag_run_id,
+ XComModel.set(
key=key,
value=value,
run_id=run_id,
task_id=task_id,
dag_id=dag_id,
map_index=map_index,
+ serialize=False,
+ session=session,
)
- session.add(new)
- session.flush()
+ except ValueError as e:
+ raise HTTPException(status.HTTP_404_NOT_FOUND, f"{e}")
Review Comment:
Because it also already returns 404 in that case in main?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]