Copilot commented on code in PR #64577:
URL: https://github.com/apache/airflow/pull/64577#discussion_r3066476237
##########
airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py:
##########
@@ -548,17 +548,18 @@ def wait_dag_run_until_finished(
] = None,
):
"Wait for a dag run until it finishes, and return its result(s)."
- if result_task_ids:
- if not get_auth_manager().is_authorized_dag(
- method="GET",
- access_entity=DagAccessEntity.XCOM,
- details=DagDetails(id=dag_id),
- user=user,
- ):
+ if not get_auth_manager().is_authorized_dag(
+ method="GET",
+ access_entity=DagAccessEntity.XCOM,
+ details=DagDetails(id=dag_id),
+ user=user,
+ ):
+ if result_task_ids:
raise HTTPException(
status.HTTP_403_FORBIDDEN,
"User is not authorized to read XCom data for this DAG",
)
+ result_task_ids = [] # Explicitly not returning any XCom results.
Review Comment:
Using `result_task_ids = []` to suppress results relies on how
`XComModel.get_many(..., task_ids=[])` interprets an empty list. If `get_many`
treats empty lists as falsy and skips filtering, this could unintentionally
return XCom data to an unauthorized user. To make the authorization boundary
explicit, avoid calling the XCom query path at all when unauthorized (e.g.,
pass an explicit `include_results/can_read_xcom` flag into the service, or
short-circuit result serialization for this request).
##########
airflow-core/src/airflow/api_fastapi/core_api/services/public/dag_run.py:
##########
@@ -49,12 +49,19 @@ async def _get_dag_run(self) -> DagRun:
return await
session.scalar(select(DagRun).filter_by(dag_id=self.dag_id, run_id=self.run_id))
async def _serialize_xcoms(self) -> dict[str, Any]:
- xcom_query = XComModel.get_many(
- run_id=self.run_id,
- key=XCOM_RETURN_KEY,
- task_ids=self.result_task_ids,
- dag_ids=self.dag_id,
- )
+ if self.result_task_ids is None: # Return dag-author-specified
results.
+ xcom_query = XComModel.get_many(
+ run_id=self.run_id,
Review Comment:
The default (DAG-authored) branch does not filter by `key=XCOM_RETURN_KEY`,
so it may return unrelated XComs (any key) as long as `dag_result` is true.
This can (a) leak non-return XCom payloads in the `results` field and (b)
produce ambiguous/multiple entries per task. Filter by `key=XCOM_RETURN_KEY` in
this branch as well, and keep `dag_result` as an additional constraint.
```suggestion
run_id=self.run_id,
key=XCOM_RETURN_KEY,
```
##########
airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py:
##########
@@ -160,7 +162,16 @@ def setup(request, dag_maker, session=None):
ti.task = task
ti.state = State.SUCCESS
session.merge(ti)
- ti.xcom_push("return_value", f"result_{i}")
+ XComModel.set(
+ key="return_value",
+ value=f"result_{i}",
+ task_id=ti.task_id,
+ dag_id=ti.dag_id,
+ run_id=ti.run_id,
+ map_index=ti.map_index,
+ dag_result=task.returns_dag_result,
+ session=session,
+ )
Review Comment:
Use `XCOM_RETURN_KEY` instead of the string literal `\"return_value\"` to
keep tests consistent with production code and avoid coupling to the literal
value.
##########
airflow-core/src/airflow/api_fastapi/core_api/services/public/dag_run.py:
##########
@@ -49,12 +49,19 @@ async def _get_dag_run(self) -> DagRun:
return await
session.scalar(select(DagRun).filter_by(dag_id=self.dag_id, run_id=self.run_id))
async def _serialize_xcoms(self) -> dict[str, Any]:
- xcom_query = XComModel.get_many(
- run_id=self.run_id,
- key=XCOM_RETURN_KEY,
- task_ids=self.result_task_ids,
- dag_ids=self.dag_id,
- )
+ if self.result_task_ids is None: # Return dag-author-specified
results.
+ xcom_query = XComModel.get_many(
+ run_id=self.run_id,
+ dag_ids=self.dag_id,
+ )
+ xcom_query = xcom_query.where(XComModel.dag_result == true())
Review Comment:
For boolean columns, using `XComModel.dag_result.is_(True)` (or
`.is_(true())`) is clearer and avoids needing to import `true` just for an
equality comparison. This also reads more directly as a boolean predicate.
##########
airflow-core/src/airflow/api_fastapi/core_api/services/public/dag_run.py:
##########
@@ -24,7 +24,7 @@
from typing import TYPE_CHECKING, Any
import attrs
-from sqlalchemy import select
+from sqlalchemy import select, true
Review Comment:
For boolean columns, using `XComModel.dag_result.is_(True)` (or
`.is_(true())`) is clearer and avoids needing to import `true` just for an
equality comparison. This also reads more directly as a boolean predicate.
##########
airflow-core/src/airflow/api_fastapi/core_api/services/public/dag_run.py:
##########
@@ -75,8 +82,8 @@ async def _serialize_response(self, dag_run: DagRun) -> str:
resp = {"state": dag_run.state}
if dag_run.state not in State.finished_dr_states:
return json.dumps(resp)
- if self.result_task_ids:
- resp["results"] = await self._serialize_xcoms()
+ if result_xcoms := await self._serialize_xcoms():
+ resp["results"] = result_xcoms
Review Comment:
This now queries XComs for every finished run response, even when the client
explicitly should not receive results (e.g., an authorization-suppressed empty
list) or when results are not requested. Consider guarding the call so
`_serialize_xcoms()` only runs when (a) `result_task_ids is None` (default
DAG-authored behavior) or (b) `result_task_ids` is a non-empty list; skip the
query entirely when `result_task_ids == []`.
```suggestion
if self.result_task_ids is None or self.result_task_ids:
if result_xcoms := await self._serialize_xcoms():
resp["results"] = result_xcoms
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]