This is an automated email from the ASF dual-hosted git repository. kaxilnaik pushed a commit to branch v3-0-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 3251e68cf9c509cd08f0067a9e8adcc354dc3263 Author: Isaiah Iruoha <[email protected]> AuthorDate: Wed Aug 13 06:22:06 2025 -0700 fix map_index is null dynamic task mapping bug (#54249) closes: #52881 When tasks within nested task groups try to access XCom values from sibling tasks in the same mapped parent task group instance, the system incorrectly uses `map_index=null` instead of the correct map index, causing XCom lookup failures. <img width="1033" height="494" alt="Screenshot 2025-08-07 at 1 47 06 PM" src="https://github.com/user-attachments/assets/f6c1767b-a999-470d-b090-478643dcdc01" /> ```javascript No XCom value found; defaulting to None.: key="return_value": dag_id="test_dag": task_id="expandable_task_group.inner_task_group.alter_input": run_id="manual__2025-07-04T14:56:47.815002+00:00": map_index=null ``` The issue is in `airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py` in the `_get_upstream_map_indexes` function. The logic only checks the immediate parent task group (upstream_task.task_group) but doesn't consider nested scenarios where tasks are within a regular task group that is itself within a mapped task group. Replace the immediate parent check with a hierarchy-aware check using `get_closest_mapped_task_group()`. This ensures that both tasks recognize they share the same mapped task group ancestor and use the same map_index. - Verified proposed solution resolves issue: #52881 when tested locally using `breeze start-airflow` against the latest main branch development environment. - Added `test_nested_mapped_task_group_upstream_indexes` to verify that tasks in nested mapped task groups correctly resolve upstream map indexes. (cherry picked from commit f685c61704017a256fbb4393b3f4d0e6722aa869) --- .../execution_api/routes/task_instances.py | 11 ++-- .../versions/head/test_task_instances.py | 75 ++++++++++++++++++++++ 2 files changed, 80 insertions(+), 6 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py index e22d0a5f34d..ea56e2f37d4 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py @@ -66,7 +66,6 @@ from airflow.models.trigger import Trigger from airflow.models.xcom import XComModel from airflow.sdk.definitions._internal.expandinput import NotFullyPopulated from airflow.sdk.definitions.asset import Asset, AssetUniqueKey -from airflow.sdk.definitions.taskgroup import MappedTaskGroup from airflow.utils import timezone from airflow.utils.state import DagRunState, TaskInstanceState, TerminalTIState @@ -289,20 +288,20 @@ def ti_run( def _get_upstream_map_indexes( task: Operator, ti_map_index: int, run_id: str, session: SessionDep ) -> Iterator[tuple[str, int | list[int] | None]]: + task_mapped_group = task.get_closest_mapped_task_group() for upstream_task in task.upstream_list: + upstream_mapped_group = upstream_task.get_closest_mapped_task_group() map_indexes: int | list[int] | None - if not isinstance(upstream_task.task_group, MappedTaskGroup): + if upstream_mapped_group is None: # regular tasks or non-mapped task groups map_indexes = None - elif task.task_group == upstream_task.task_group: - # tasks in the same mapped task group - # the task should use the map_index as the previous task in the same mapped task group + elif task_mapped_group == upstream_mapped_group: + # tasks in the same mapped task group hierarchy map_indexes = ti_map_index else: # tasks not in the same mapped task group # the upstream mapped task group should combine the return xcom as a list and return it mapped_ti_count: int - upstream_mapped_group = upstream_task.task_group try: # for cases that does not need to resolve xcom mapped_ti_count = upstream_mapped_group.get_parse_time_mapped_ti_count() diff --git a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py index d18b18b673e..4017a04d82e 100644 --- a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py +++ b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py @@ -301,6 +301,81 @@ class TestTIRunState: upstream_map_indexes = response.json()["upstream_map_indexes"] assert upstream_map_indexes == expected_upstream_map_indexes[(ti.task_id, ti.map_index)] + def test_nested_mapped_task_group_upstream_indexes(self, client, dag_maker): + """ + Test that upstream_map_indexes are correctly computed for tasks in nested mapped task groups. + """ + with dag_maker("test_nested_mapped_tg", serialized=True): + + @task + def alter_input(inp: str) -> str: + return f"{inp}_Altered" + + @task + def print_task(orig_input: str, altered_input: str) -> str: + return f"orig:{orig_input},altered:{altered_input}" + + @task_group + def inner_task_group(orig_input: str) -> None: + altered_input = alter_input(orig_input) + print_task(orig_input, altered_input) + + @task_group + def expandable_task_group(param: str) -> None: + inner_task_group(param) + + expandable_task_group.expand(param=["One", "Two", "Three"]) + + dr = dag_maker.create_dagrun() + + # Set all alter_input tasks to success so print_task can run + for ti in dr.get_task_instances(): + if "alter_input" in ti.task_id and ti.map_index >= 0: + ti.state = State.SUCCESS + elif "print_task" in ti.task_id and ti.map_index >= 0: + ti.set_state(State.QUEUED) + dag_maker.session.flush() + + # Expected upstream_map_indexes for each print_task instance + expected_upstream_map_indexes = { + ("expandable_task_group.inner_task_group.print_task", 0): { + "expandable_task_group.inner_task_group.alter_input": 0 + }, + ("expandable_task_group.inner_task_group.print_task", 1): { + "expandable_task_group.inner_task_group.alter_input": 1 + }, + ("expandable_task_group.inner_task_group.print_task", 2): { + "expandable_task_group.inner_task_group.alter_input": 2 + }, + } + + # Get only the expanded print_task instances (not the template) + print_task_tis = [ + ti for ti in dr.get_task_instances() if "print_task" in ti.task_id and ti.map_index >= 0 + ] + + # Test each print_task instance + for ti in print_task_tis: + response = client.patch( + f"/execution/task-instances/{ti.id}/run", + json={ + "state": "running", + "hostname": "random-hostname", + "unixname": "random-unixname", + "pid": 100, + "start_date": "2024-09-30T12:00:00Z", + }, + ) + + assert response.status_code == 200 + upstream_map_indexes = response.json()["upstream_map_indexes"] + expected = expected_upstream_map_indexes[(ti.task_id, ti.map_index)] + + assert upstream_map_indexes == expected, ( + f"Task {ti.task_id}[{ti.map_index}] should have upstream_map_indexes {expected}, " + f"but got {upstream_map_indexes}" + ) + def test_dynamic_task_mapping_with_xcom(self, client, dag_maker, create_task_instance, session, run_task): """ Test that the Task Instance upstream_map_indexes is correctly fetched when to running the Task Instances with xcom
