This is an automated email from the ASF dual-hosted git repository.
vatsrahul1001 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new a67d3f98cbc Create test case to verify mapped task as result (#68550)
a67d3f98cbc is described below
commit a67d3f98cbcbf5903dc964c2ddeebb282aef75bd
Author: Tzu-ping Chung <[email protected]>
AuthorDate: Tue Jun 23 18:20:26 2026 +0800
Create test case to verify mapped task as result (#68550)
---
.../core_api/services/public/dag_run.py | 7 ++--
.../core_api/routes/public/test_dag_run.py | 43 +++++++++++++++++++---
2 files changed, 41 insertions(+), 9 deletions(-)
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/services/public/dag_run.py
b/airflow-core/src/airflow/api_fastapi/core_api/services/public/dag_run.py
index ea9d3ca5ce2..856bc1710b4 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/services/public/dag_run.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/services/public/dag_run.py
@@ -219,10 +219,11 @@ class DagRunWaiter:
task_ids=self.result_task_ids,
dag_ids=self.dag_id,
)
+ # XComModel.get_many() orders XCom by timestamp. Reset this to make
+ # mapped task results stable since execution order is not guaranteed.
+ xcom_query = xcom_query.order_by(None).order_by(XComModel.task_id,
XComModel.map_index)
async with create_session_async() as session:
- xcom_results = (
- await session.scalars(xcom_query.order_by(XComModel.task_id,
XComModel.map_index))
- ).all()
+ xcom_results = (await session.scalars(xcom_query)).all()
def _group_xcoms(g: Iterator[XComModel | tuple[XComModel]]) -> Any:
entries = [row[0] if isinstance(row, tuple) else row for row in g]
diff --git
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
index ff14aaa8980..9b879e519bf 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
@@ -42,8 +42,7 @@ from airflow.models.taskinstance import TaskInstance
from airflow.models.team import Team
from airflow.models.xcom import XComModel
from airflow.providers.standard.operators.empty import EmptyOperator
-from airflow.sdk.definitions.asset import Asset
-from airflow.sdk.definitions.param import Param
+from airflow.sdk import Asset, Param, result, task
from airflow.settings import _configure_async_session
from airflow.timetables.interval import CronDataIntervalTimetable
from airflow.timetables.simple import PartitionAtRuntime,
PartitionedAssetTimetable
@@ -64,6 +63,7 @@ from tests_common.test_utils.db import (
clear_db_serialized_dags,
)
from tests_common.test_utils.format_datetime import from_datetime_to_zulu,
from_datetime_to_zulu_without_ms
+from tests_common.test_utils.taskinstance import run_task_instance
from unit.listeners.class_listener import ClassBasedListener
if TYPE_CHECKING:
@@ -176,9 +176,9 @@ def setup(request, dag_maker, *, session=None):
# Set conf for testing conf_contains filter (values ordered for
predictable sorting)
dag_run1.conf = {"env": "development", "version": "1.0"}
- for i, task in enumerate([task1, task2], start=1):
- ti = dag_run1.get_task_instance(task_id=task.task_id)
- ti.task = task
+ for i, t in enumerate([task1, task2], start=1):
+ ti = dag_run1.get_task_instance(task_id=t.task_id)
+ ti.task = t
ti.state = State.SUCCESS
session.merge(ti)
XComModel.set(
@@ -188,7 +188,7 @@ def setup(request, dag_maker, *, session=None):
dag_id=ti.dag_id,
run_id=ti.run_id,
map_index=ti.map_index,
- dag_result=task.returns_dag_result,
+ dag_result=t.returns_dag_result,
session=session,
)
@@ -3172,6 +3172,37 @@ class TestWaitDagRun:
data = response.json()
assert data == {"state": DagRunState.SUCCESS}
+ def test_collect_mapped_task_dag_result(self, test_client, dag_maker,
session):
+ """XComs from a mapped @result task are aggregated into a list ordered
by map_index."""
+ with dag_maker("dag_mapped_result"):
+
+ @result
+ @task(task_id="a")
+ def double(v):
+ return v * 2
+
+ mapped = double.expand(v=[1, 2])
+
+ mapped_op = mapped.operator # MappedOperator with
returns_dag_result=True
+
+ dag_run = dag_maker.create_dagrun(
+ run_id="mapped_run_1",
+ state=DagRunState.SUCCESS,
+ run_type=DagRunType.MANUAL,
+ triggered_by=DagRunTriggeredByType.UI,
+ logical_date=LOGICAL_DATE1,
+ )
+ for ti in dag_run.task_instances:
+ run_task_instance(ti, mapped_op, session=session)
+ session.commit()
+
+ response = test_client.get(
+ f"/dags/dag_mapped_result/dagRuns/{dag_run.run_id}/wait",
+ params={"interval": "1"},
+ )
+ assert response.status_code == 200
+ assert response.json() == {"state": DagRunState.SUCCESS, "results":
{"a": [2, 4]}}
+
class TestBulkDagRuns:
ENDPOINT_URL = f"/dags/{DAG1_ID}/dagRuns"