This is an automated email from the ASF dual-hosted git repository.

vatsrahul1001 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new a67d3f98cbc Create test case to verify mapped task as result (#68550)
a67d3f98cbc is described below

commit a67d3f98cbcbf5903dc964c2ddeebb282aef75bd
Author: Tzu-ping Chung <[email protected]>
AuthorDate: Tue Jun 23 18:20:26 2026 +0800

    Create test case to verify mapped task as result (#68550)
---
 .../core_api/services/public/dag_run.py            |  7 ++--
 .../core_api/routes/public/test_dag_run.py         | 43 +++++++++++++++++++---
 2 files changed, 41 insertions(+), 9 deletions(-)

diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/services/public/dag_run.py 
b/airflow-core/src/airflow/api_fastapi/core_api/services/public/dag_run.py
index ea9d3ca5ce2..856bc1710b4 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/services/public/dag_run.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/services/public/dag_run.py
@@ -219,10 +219,11 @@ class DagRunWaiter:
                 task_ids=self.result_task_ids,
                 dag_ids=self.dag_id,
             )
+        # XComModel.get_many() orders XCom by timestamp. Reset this to make
+        # mapped task results stable since execution order is not guaranteed.
+        xcom_query = xcom_query.order_by(None).order_by(XComModel.task_id, 
XComModel.map_index)
         async with create_session_async() as session:
-            xcom_results = (
-                await session.scalars(xcom_query.order_by(XComModel.task_id, 
XComModel.map_index))
-            ).all()
+            xcom_results = (await session.scalars(xcom_query)).all()
 
         def _group_xcoms(g: Iterator[XComModel | tuple[XComModel]]) -> Any:
             entries = [row[0] if isinstance(row, tuple) else row for row in g]
diff --git 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
index ff14aaa8980..9b879e519bf 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
@@ -42,8 +42,7 @@ from airflow.models.taskinstance import TaskInstance
 from airflow.models.team import Team
 from airflow.models.xcom import XComModel
 from airflow.providers.standard.operators.empty import EmptyOperator
-from airflow.sdk.definitions.asset import Asset
-from airflow.sdk.definitions.param import Param
+from airflow.sdk import Asset, Param, result, task
 from airflow.settings import _configure_async_session
 from airflow.timetables.interval import CronDataIntervalTimetable
 from airflow.timetables.simple import PartitionAtRuntime, 
PartitionedAssetTimetable
@@ -64,6 +63,7 @@ from tests_common.test_utils.db import (
     clear_db_serialized_dags,
 )
 from tests_common.test_utils.format_datetime import from_datetime_to_zulu, 
from_datetime_to_zulu_without_ms
+from tests_common.test_utils.taskinstance import run_task_instance
 from unit.listeners.class_listener import ClassBasedListener
 
 if TYPE_CHECKING:
@@ -176,9 +176,9 @@ def setup(request, dag_maker, *, session=None):
     # Set conf for testing conf_contains filter (values ordered for 
predictable sorting)
     dag_run1.conf = {"env": "development", "version": "1.0"}
 
-    for i, task in enumerate([task1, task2], start=1):
-        ti = dag_run1.get_task_instance(task_id=task.task_id)
-        ti.task = task
+    for i, t in enumerate([task1, task2], start=1):
+        ti = dag_run1.get_task_instance(task_id=t.task_id)
+        ti.task = t
         ti.state = State.SUCCESS
         session.merge(ti)
         XComModel.set(
@@ -188,7 +188,7 @@ def setup(request, dag_maker, *, session=None):
             dag_id=ti.dag_id,
             run_id=ti.run_id,
             map_index=ti.map_index,
-            dag_result=task.returns_dag_result,
+            dag_result=t.returns_dag_result,
             session=session,
         )
 
@@ -3172,6 +3172,37 @@ class TestWaitDagRun:
         data = response.json()
         assert data == {"state": DagRunState.SUCCESS}
 
+    def test_collect_mapped_task_dag_result(self, test_client, dag_maker, 
session):
+        """XComs from a mapped @result task are aggregated into a list ordered 
by map_index."""
+        with dag_maker("dag_mapped_result"):
+
+            @result
+            @task(task_id="a")
+            def double(v):
+                return v * 2
+
+            mapped = double.expand(v=[1, 2])
+
+        mapped_op = mapped.operator  # MappedOperator with 
returns_dag_result=True
+
+        dag_run = dag_maker.create_dagrun(
+            run_id="mapped_run_1",
+            state=DagRunState.SUCCESS,
+            run_type=DagRunType.MANUAL,
+            triggered_by=DagRunTriggeredByType.UI,
+            logical_date=LOGICAL_DATE1,
+        )
+        for ti in dag_run.task_instances:
+            run_task_instance(ti, mapped_op, session=session)
+        session.commit()
+
+        response = test_client.get(
+            f"/dags/dag_mapped_result/dagRuns/{dag_run.run_id}/wait",
+            params={"interval": "1"},
+        )
+        assert response.status_code == 200
+        assert response.json() == {"state": DagRunState.SUCCESS, "results": 
{"a": [2, 4]}}
+
 
 class TestBulkDagRuns:
     ENDPOINT_URL = f"/dags/{DAG1_ID}/dagRuns"

Reply via email to