This is an automated email from the ASF dual-hosted git repository.
pierrejeambrun pushed a commit to branch v3-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v3-2-test by this push:
new 542a30fe8ae [v3-2-test] add check for xcom permission when result is
specified in query parameter (#64415) (#64488)
542a30fe8ae is described below
commit 542a30fe8ae424e534feff1d1d9f1cb8d0e9496d
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Mon Mar 30 17:15:27 2026 +0200
[v3-2-test] add check for xcom permission when result is specified in query
parameter (#64415) (#64488)
(cherry picked from commit 35ca494ace6ec7d4962c27f5fd7137097934c0b1)
Co-authored-by: Kevin Yang <[email protected]>
---
.../api_fastapi/core_api/routes/public/dag_run.py | 15 +++++++++-
.../core_api/routes/public/test_dag_run.py | 32 ++++++++++++++++++++++
2 files changed, 46 insertions(+), 1 deletion(-)
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py
index 67f76307124..ff42238806b 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py
@@ -33,7 +33,8 @@ from airflow.api.common.mark_tasks import (
set_dag_run_state_to_queued,
set_dag_run_state_to_success,
)
-from airflow.api_fastapi.auth.managers.models.resource_details import
DagAccessEntity
+from airflow.api_fastapi.app import get_auth_manager
+from airflow.api_fastapi.auth.managers.models.resource_details import
DagAccessEntity, DagDetails
from airflow.api_fastapi.common.dagbag import DagBagDep, get_dag_for_run,
get_latest_version_of_dag
from airflow.api_fastapi.common.db.common import SessionDep, paginated_select
from airflow.api_fastapi.common.db.dag_runs import (
@@ -536,6 +537,7 @@ def wait_dag_run_until_finished(
dag_id: str,
dag_run_id: str,
session: SessionDep,
+ user: GetUserDep,
interval: Annotated[float, Query(gt=0.0, description="Seconds to wait
between dag run state checks")],
result_task_ids: Annotated[
list[str] | None,
@@ -543,6 +545,17 @@ def wait_dag_run_until_finished(
] = None,
):
"Wait for a dag run until it finishes, and return its result(s)."
+ if result_task_ids:
+ if not get_auth_manager().is_authorized_dag(
+ method="GET",
+ access_entity=DagAccessEntity.XCOM,
+ details=DagDetails(id=dag_id),
+ user=user,
+ ):
+ raise HTTPException(
+ status.HTTP_403_FORBIDDEN,
+ "User is not authorized to read XCom data for this DAG",
+ )
if not session.scalar(select(1).where(DagRun.dag_id == dag_id,
DagRun.run_id == dag_run_id)):
raise HTTPException(
status.HTTP_404_NOT_FOUND,
diff --git
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
index 3a55532e3d7..2e80a3501fe 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
@@ -2079,3 +2079,35 @@ class TestWaitDagRun:
assert response.status_code == 200
data = response.json()
assert data == {"state": DagRunState.SUCCESS, "results": {"task_1":
'"result_1"'}}
+
+ def test_should_respond_403_when_user_lacks_xcom_permission(self,
test_client):
+ from airflow.api_fastapi.auth.managers.models.resource_details import
DagAccessEntity, DagDetails
+
+ with mock.patch(
+
"airflow.api_fastapi.core_api.routes.public.dag_run.get_auth_manager",
+ autospec=True,
+ ) as mock_get_auth_manager:
+ mock_get_auth_manager.return_value.is_authorized_dag.return_value
= False
+
+ response = test_client.get(
+ f"/dags/{DAG1_ID}/dagRuns/{DAG1_RUN1_ID}/wait",
+ params={"interval": "1", "result": "task_1"},
+ )
+
+ assert response.status_code == 403
+
mock_get_auth_manager.return_value.is_authorized_dag.assert_called_once_with(
+ method="GET",
+ access_entity=DagAccessEntity.XCOM,
+ details=DagDetails(id=DAG1_ID),
+ user=mock.ANY,
+ )
+
+ def
test_should_respond_200_without_result_when_user_lacks_xcom_permission(self,
test_client):
+ """Waiting without result parameter should not require XCom
permissions."""
+ response = test_client.get(
+ f"/dags/{DAG1_ID}/dagRuns/{DAG1_RUN1_ID}/wait",
+ params={"interval": "1"},
+ )
+ assert response.status_code == 200
+ data = response.json()
+ assert data == {"state": DagRunState.SUCCESS}