This is an automated email from the ASF dual-hosted git repository.

uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new ec3f008f0b1 Return dag-specified results in dag run wait API (#64577)
ec3f008f0b1 is described below

commit ec3f008f0b161f2a8cf52eef0bfc2a63711993e1
Author: Tzu-ping Chung <[email protected]>
AuthorDate: Sun Jun 14 09:07:27 2026 +0800

    Return dag-specified results in dag run wait API (#64577)
---
 .../core_api/openapi/v2-rest-api-generated.yaml    |  8 ++-
 .../api_fastapi/core_api/routes/public/dag_run.py  | 24 +++++---
 .../core_api/services/public/dag_run.py            | 25 +++++---
 .../ui/openapi-gen/queries/ensureQueryData.ts      |  4 +-
 .../src/airflow/ui/openapi-gen/queries/prefetch.ts |  4 +-
 .../src/airflow/ui/openapi-gen/queries/queries.ts  |  4 +-
 .../src/airflow/ui/openapi-gen/queries/suspense.ts |  4 +-
 .../ui/openapi-gen/requests/services.gen.ts        |  4 +-
 .../airflow/ui/openapi-gen/requests/types.gen.ts   |  2 +-
 .../core_api/routes/public/test_dag_run.py         | 67 ++++++++++++++++++----
 10 files changed, 105 insertions(+), 41 deletions(-)

diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
 
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
index 96f5bdec1ff..5e28d7be8ce 100644
--- 
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
+++ 
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
@@ -2980,9 +2980,13 @@ paths:
             items:
               type: string
           - type: 'null'
-          description: Collect result XCom from task. Can be set multiple 
times.
+          description: Collect result XCom from task. Can be set multiple 
times. If
+            unset, return value of the return task as specified in the dag (in 
present)
+            is returned by default.
           title: Result
-        description: Collect result XCom from task. Can be set multiple times.
+        description: Collect result XCom from task. Can be set multiple times. 
If
+          unset, return value of the return task as specified in the dag (in 
present)
+          is returned by default.
       responses:
         '200':
           description: Successful Response
diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py 
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py
index 784e549f1be..0860b29da94 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py
@@ -703,21 +703,29 @@ def wait_dag_run_until_finished(
     interval: Annotated[float, Query(gt=0.0, description="Seconds to wait 
between dag run state checks")],
     result_task_ids: Annotated[
         list[str] | None,
-        Query(alias="result", description="Collect result XCom from task. Can 
be set multiple times."),
+        Query(
+            alias="result",
+            description=(
+                "Collect result XCom from task. Can be set multiple times. "
+                "If unset, return value of the return task as specified in the 
"
+                "dag (in present) is returned by default."
+            ),
+        ),
     ] = None,
 ):
     "Wait for a dag run until it finishes, and return its result(s)."
-    if result_task_ids:
-        if not get_auth_manager().is_authorized_dag(
-            method="GET",
-            access_entity=DagAccessEntity.XCOM,
-            details=DagDetails(id=dag_id),
-            user=user,
-        ):
+    if not get_auth_manager().is_authorized_dag(
+        method="GET",
+        access_entity=DagAccessEntity.XCOM,
+        details=DagDetails(id=dag_id),
+        user=user,
+    ):
+        if result_task_ids:
             raise HTTPException(
                 status.HTTP_403_FORBIDDEN,
                 "User is not authorized to read XCom data for this Dag",
             )
+        result_task_ids = []  # Explicitly not returning any XCom results.
     if not session.scalar(select(1).where(DagRun.dag_id == dag_id, 
DagRun.run_id == dag_run_id)):
         raise HTTPException(
             status.HTTP_404_NOT_FOUND,
diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/services/public/dag_run.py 
b/airflow-core/src/airflow/api_fastapi/core_api/services/public/dag_run.py
index d063493a994..c473b908b0e 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/services/public/dag_run.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/services/public/dag_run.py
@@ -200,12 +200,20 @@ class DagRunWaiter:
             return await 
session.scalar(select(DagRun).filter_by(dag_id=self.dag_id, run_id=self.run_id))
 
     async def _serialize_xcoms(self) -> dict[str, Any]:
-        xcom_query = XComModel.get_many(
-            run_id=self.run_id,
-            key=XCOM_RETURN_KEY,
-            task_ids=self.result_task_ids,
-            dag_ids=self.dag_id,
-        )
+        if self.result_task_ids is None:  # Return dag-author-specified 
results.
+            xcom_query = XComModel.get_many(
+                run_id=self.run_id,
+                key=XCOM_RETURN_KEY,
+                dag_ids=self.dag_id,
+            )
+            xcom_query = xcom_query.where(XComModel.dag_result.is_(True))
+        else:  # Explicitly API user-specified results.
+            xcom_query = XComModel.get_many(
+                run_id=self.run_id,
+                key=XCOM_RETURN_KEY,
+                task_ids=self.result_task_ids,
+                dag_ids=self.dag_id,
+            )
         async with create_session_async() as session:
             xcom_results = (
                 await session.scalars(xcom_query.order_by(XComModel.task_id, 
XComModel.map_index))
@@ -226,8 +234,9 @@ class DagRunWaiter:
         resp = {"state": dag_run.state}
         if dag_run.state not in State.finished_dr_states:
             return json.dumps(resp)
-        if self.result_task_ids:
-            resp["results"] = await self._serialize_xcoms()
+        if self.result_task_ids is None or self.result_task_ids:
+            if result_xcoms := await self._serialize_xcoms():
+                resp["results"] = result_xcoms
         return json.dumps(resp)
 
     async def wait(self) -> AsyncGenerator[str, None]:
diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts 
b/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts
index 48f61be34d1..69cc0d40363 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts
@@ -408,7 +408,7 @@ export const 
ensureUseDagRunServiceGetUpstreamAssetEventsData = (queryClient: Qu
 * @param data.dagId
 * @param data.dagRunId
 * @param data.interval Seconds to wait between dag run state checks
-* @param data.result Collect result XCom from task. Can be set multiple times.
+* @param data.result Collect result XCom from task. Can be set multiple times. 
If unset, return value of the return task as specified in the dag (in present) 
is returned by default.
 * @returns unknown Successful Response
 * @throws ApiError
 */
@@ -438,7 +438,7 @@ export const ensureUseDagRunServiceGetDagRunStatsData = 
(queryClient: QueryClien
 * @param data.dagId
 * @param data.dagRunId
 * @param data.interval Seconds to wait between dag run state checks
-* @param data.result Collect result XCom from task. Can be set multiple times.
+* @param data.result Collect result XCom from task. Can be set multiple times. 
If unset, return value of the return task as specified in the dag (in present) 
is returned by default.
 * @returns unknown Successful Response
 * @throws ApiError
 */
diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts 
b/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts
index d4fb4ae8896..e1d1529ccf2 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts
@@ -408,7 +408,7 @@ export const prefetchUseDagRunServiceGetUpstreamAssetEvents 
= (queryClient: Quer
 * @param data.dagId
 * @param data.dagRunId
 * @param data.interval Seconds to wait between dag run state checks
-* @param data.result Collect result XCom from task. Can be set multiple times.
+* @param data.result Collect result XCom from task. Can be set multiple times. 
If unset, return value of the return task as specified in the dag (in present) 
is returned by default.
 * @returns unknown Successful Response
 * @throws ApiError
 */
@@ -438,7 +438,7 @@ export const prefetchUseDagRunServiceGetDagRunStats = 
(queryClient: QueryClient,
 * @param data.dagId
 * @param data.dagRunId
 * @param data.interval Seconds to wait between dag run state checks
-* @param data.result Collect result XCom from task. Can be set multiple times.
+* @param data.result Collect result XCom from task. Can be set multiple times. 
If unset, return value of the return task as specified in the dag (in present) 
is returned by default.
 * @returns unknown Successful Response
 * @throws ApiError
 */
diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts 
b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts
index 91662271aca..f3b16e9835d 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts
@@ -408,7 +408,7 @@ export const useDagRunServiceGetUpstreamAssetEvents = 
<TData = Common.DagRunServ
 * @param data.dagId
 * @param data.dagRunId
 * @param data.interval Seconds to wait between dag run state checks
-* @param data.result Collect result XCom from task. Can be set multiple times.
+* @param data.result Collect result XCom from task. Can be set multiple times. 
If unset, return value of the return task as specified in the dag (in present) 
is returned by default.
 * @returns unknown Successful Response
 * @throws ApiError
 */
@@ -438,7 +438,7 @@ export const useDagRunServiceGetDagRunStats = <TData = 
Common.DagRunServiceGetDa
 * @param data.dagId
 * @param data.dagRunId
 * @param data.interval Seconds to wait between dag run state checks
-* @param data.result Collect result XCom from task. Can be set multiple times.
+* @param data.result Collect result XCom from task. Can be set multiple times. 
If unset, return value of the return task as specified in the dag (in present) 
is returned by default.
 * @returns unknown Successful Response
 * @throws ApiError
 */
diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts 
b/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts
index b810526782c..49d32f8b70c 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts
@@ -408,7 +408,7 @@ export const useDagRunServiceGetUpstreamAssetEventsSuspense 
= <TData = Common.Da
 * @param data.dagId
 * @param data.dagRunId
 * @param data.interval Seconds to wait between dag run state checks
-* @param data.result Collect result XCom from task. Can be set multiple times.
+* @param data.result Collect result XCom from task. Can be set multiple times. 
If unset, return value of the return task as specified in the dag (in present) 
is returned by default.
 * @returns unknown Successful Response
 * @throws ApiError
 */
@@ -438,7 +438,7 @@ export const useDagRunServiceGetDagRunStatsSuspense = 
<TData = Common.DagRunServ
 * @param data.dagId
 * @param data.dagRunId
 * @param data.interval Seconds to wait between dag run state checks
-* @param data.result Collect result XCom from task. Can be set multiple times.
+* @param data.result Collect result XCom from task. Can be set multiple times. 
If unset, return value of the return task as specified in the dag (in present) 
is returned by default.
 * @returns unknown Successful Response
 * @throws ApiError
 */
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts 
b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts
index f8205d72084..453df4666cd 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts
@@ -1228,7 +1228,7 @@ export class DagRunService {
      * @param data.dagId
      * @param data.dagRunId
      * @param data.interval Seconds to wait between dag run state checks
-     * @param data.result Collect result XCom from task. Can be set multiple 
times.
+     * @param data.result Collect result XCom from task. Can be set multiple 
times. If unset, return value of the return task as specified in the dag (in 
present) is returned by default.
      * @returns unknown Successful Response
      * @throws ApiError
      */
@@ -1342,7 +1342,7 @@ export class ExperimentalService {
      * @param data.dagId
      * @param data.dagRunId
      * @param data.interval Seconds to wait between dag run state checks
-     * @param data.result Collect result XCom from task. Can be set multiple 
times.
+     * @param data.result Collect result XCom from task. Can be set multiple 
times. If unset, return value of the return task as specified in the dag (in 
present) is returned by default.
      * @returns unknown Successful Response
      * @throws ApiError
      */
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts 
b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
index 27ff0f96656..fd34703d468 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -3092,7 +3092,7 @@ export type WaitDagRunUntilFinishedData = {
      */
     interval: number;
     /**
-     * Collect result XCom from task. Can be set multiple times.
+     * Collect result XCom from task. Can be set multiple times. If unset, 
return value of the return task as specified in the dag (in present) is 
returned by default.
      */
     result?: Array<(string)> | null;
 };
diff --git 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
index 4dfb5bb889a..653d7ba8d8f 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
@@ -40,6 +40,7 @@ from airflow.models.asset import AssetEvent, AssetModel
 from airflow.models.dagbundle import DagBundleModel
 from airflow.models.taskinstance import TaskInstance
 from airflow.models.team import Team
+from airflow.models.xcom import XComModel
 from airflow.providers.standard.operators.empty import EmptyOperator
 from airflow.sdk.definitions.asset import Asset
 from airflow.sdk.definitions.param import Param
@@ -151,6 +152,7 @@ def setup(request, dag_maker, *, session=None):
     with dag_maker(DAG1_ID, schedule=None, start_date=START_DATE1, 
serialized=True):
         task1 = EmptyOperator(task_id="task_1")
         task2 = EmptyOperator(task_id="task_2")
+        dag_maker.dag.add_result(task2.output)
 
     dag_run1 = dag_maker.create_dagrun(
         run_id=DAG1_RUN1_ID,
@@ -172,7 +174,16 @@ def setup(request, dag_maker, *, session=None):
         ti.task = task
         ti.state = State.SUCCESS
         session.merge(ti)
-        ti.xcom_push("return_value", f"result_{i}")
+        XComModel.set(
+            key="return_value",
+            value=f"result_{i}",
+            task_id=ti.task_id,
+            dag_id=ti.dag_id,
+            run_id=ti.run_id,
+            map_index=ti.map_index,
+            dag_result=task.returns_dag_result,
+            session=session,
+        )
 
     dag_run2 = dag_maker.create_dagrun(
         run_id=DAG1_RUN2_ID,
@@ -2908,22 +2919,47 @@ class TestWaitDagRun:
         assert response.status_code == 422
 
     @pytest.mark.parametrize(
-        ("run_id", "state"),
-        [(DAG1_RUN1_ID, DAG1_RUN1_STATE), (DAG1_RUN2_ID, DAG1_RUN2_STATE)],
+        ("run_id", "expected"),
+        [
+            pytest.param(
+                DAG1_RUN1_ID,
+                {"state": DAG1_RUN1_STATE, "results": {"task_2": 
'"result_2"'}},
+                id="return-result-task",
+            ),
+            pytest.param(
+                DAG1_RUN2_ID,
+                {"state": DAG1_RUN2_STATE},
+                id="no-result-task",
+            ),
+        ],
     )
-    def test_should_respond_200_immediately_for_finished_run(self, 
test_client, run_id, state):
+    def test_should_respond_200_with_implicit_return_value(self, test_client, 
run_id, expected):
         response = test_client.get(f"/dags/{DAG1_ID}/dagRuns/{run_id}/wait", 
params={"interval": "100"})
         assert response.status_code == 200
         data = response.json()
-        assert data == {"state": state}
+        assert data == expected
 
-    def test_collect_task(self, test_client):
+    @pytest.mark.parametrize(
+        ("requested", "results"),
+        [
+            pytest.param("task_1", {"task_1": '"result_1"'}, 
id="only-non-result"),
+            pytest.param("task_2", {"task_2": '"result_2"'}, id="only-result"),
+        ],
+    )
+    def test_should_respond_200_with_explicit_return_value(self, test_client, 
requested, results):
         response = test_client.get(
-            f"/dags/{DAG1_ID}/dagRuns/{DAG1_RUN1_ID}/wait", 
params={"interval": "1", "result": "task_1"}
+            f"/dags/{DAG1_ID}/dagRuns/{DAG1_RUN1_ID}/wait",
+            params={"interval": "1", "result": requested},
         )
         assert response.status_code == 200
         data = response.json()
-        assert data == {"state": DagRunState.SUCCESS, "results": {"task_1": 
'"result_1"'}}
+        assert data == {"state": DagRunState.SUCCESS, "results": results}
+
+    def test_collect_authored_task_results(self, test_client):
+        response = 
test_client.get(f"/dags/{DAG1_ID}/dagRuns/{DAG1_RUN1_ID}/wait", 
params={"interval": "1"})
+        assert response.status_code == 200
+        data = response.json()
+        assert data == {"state": DagRunState.SUCCESS, "results": {"task_2": 
'"result_2"'}}
 
     def test_should_respond_403_when_user_lacks_xcom_permission(self, 
test_client):
         with mock.patch(
@@ -2947,10 +2983,17 @@ class TestWaitDagRun:
 
     def 
test_should_respond_200_without_result_when_user_lacks_xcom_permission(self, 
test_client):
         """Waiting without result parameter should not require XCom 
permissions."""
-        response = test_client.get(
-            f"/dags/{DAG1_ID}/dagRuns/{DAG1_RUN1_ID}/wait",
-            params={"interval": "1"},
-        )
+        with mock.patch(
+            
"airflow.api_fastapi.core_api.routes.public.dag_run.get_auth_manager",
+            autospec=True,
+        ) as mock_get_auth_manager:
+            mock_get_auth_manager.return_value.is_authorized_dag.return_value 
= False
+
+            response = test_client.get(
+                f"/dags/{DAG1_ID}/dagRuns/{DAG1_RUN1_ID}/wait",
+                params={"interval": "1"},
+            )
+
         assert response.status_code == 200
         data = response.json()
         assert data == {"state": DagRunState.SUCCESS}

Reply via email to