This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v3-0-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 3251e68cf9c509cd08f0067a9e8adcc354dc3263
Author: Isaiah Iruoha <[email protected]>
AuthorDate: Wed Aug 13 06:22:06 2025 -0700

    fix map_index is null dynamic task mapping bug (#54249)
    
    closes: #52881
    
    When tasks within nested task groups try to access XCom values from sibling 
tasks in the same mapped parent task group instance, the system incorrectly 
uses `map_index=null` instead of the correct map index, causing XCom lookup 
failures.
    
    <img width="1033" height="494" alt="Screenshot 2025-08-07 at 1 47 06 PM" 
src="https://github.com/user-attachments/assets/f6c1767b-a999-470d-b090-478643dcdc01";
 />
    
    ```javascript
    No XCom value found; defaulting to None.: key="return_value": 
dag_id="test_dag": 
task_id="expandable_task_group.inner_task_group.alter_input": 
run_id="manual__2025-07-04T14:56:47.815002+00:00": map_index=null
    ```
    The issue is in 
`airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py` 
in the `_get_upstream_map_indexes` function.
    
    The logic only checks the immediate parent task group 
(upstream_task.task_group) but doesn't consider nested scenarios where tasks 
are within a regular task group that is itself within a mapped task group.
    
    Replace the immediate parent check with a hierarchy-aware check using 
`get_closest_mapped_task_group()`. This ensures that both tasks recognize they 
share the same mapped task group ancestor and use the same map_index.
    
    - Verified proposed solution resolves issue: #52881 when tested locally 
using `breeze start-airflow` against the latest main branch development 
environment.
    - Added `test_nested_mapped_task_group_upstream_indexes` to verify that 
tasks in nested mapped task groups correctly resolve upstream map indexes.
    
    (cherry picked from commit f685c61704017a256fbb4393b3f4d0e6722aa869)
---
 .../execution_api/routes/task_instances.py         | 11 ++--
 .../versions/head/test_task_instances.py           | 75 ++++++++++++++++++++++
 2 files changed, 80 insertions(+), 6 deletions(-)

diff --git 
a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py 
b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
index e22d0a5f34d..ea56e2f37d4 100644
--- 
a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
+++ 
b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
@@ -66,7 +66,6 @@ from airflow.models.trigger import Trigger
 from airflow.models.xcom import XComModel
 from airflow.sdk.definitions._internal.expandinput import NotFullyPopulated
 from airflow.sdk.definitions.asset import Asset, AssetUniqueKey
-from airflow.sdk.definitions.taskgroup import MappedTaskGroup
 from airflow.utils import timezone
 from airflow.utils.state import DagRunState, TaskInstanceState, TerminalTIState
 
@@ -289,20 +288,20 @@ def ti_run(
 def _get_upstream_map_indexes(
     task: Operator, ti_map_index: int, run_id: str, session: SessionDep
 ) -> Iterator[tuple[str, int | list[int] | None]]:
+    task_mapped_group = task.get_closest_mapped_task_group()
     for upstream_task in task.upstream_list:
+        upstream_mapped_group = upstream_task.get_closest_mapped_task_group()
         map_indexes: int | list[int] | None
-        if not isinstance(upstream_task.task_group, MappedTaskGroup):
+        if upstream_mapped_group is None:
             # regular tasks or non-mapped task groups
             map_indexes = None
-        elif task.task_group == upstream_task.task_group:
-            # tasks in the same mapped task group
-            # the task should use the map_index as the previous task in the 
same mapped task group
+        elif task_mapped_group == upstream_mapped_group:
+            # tasks in the same mapped task group hierarchy
             map_indexes = ti_map_index
         else:
             # tasks not in the same mapped task group
             # the upstream mapped task group should combine the return xcom as 
a list and return it
             mapped_ti_count: int
-            upstream_mapped_group = upstream_task.task_group
             try:
                 # for cases that does not need to resolve xcom
                 mapped_ti_count = 
upstream_mapped_group.get_parse_time_mapped_ti_count()
diff --git 
a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
 
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
index d18b18b673e..4017a04d82e 100644
--- 
a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
+++ 
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
@@ -301,6 +301,81 @@ class TestTIRunState:
             upstream_map_indexes = response.json()["upstream_map_indexes"]
             assert upstream_map_indexes == 
expected_upstream_map_indexes[(ti.task_id, ti.map_index)]
 
+    def test_nested_mapped_task_group_upstream_indexes(self, client, 
dag_maker):
+        """
+        Test that upstream_map_indexes are correctly computed for tasks in 
nested mapped task groups.
+        """
+        with dag_maker("test_nested_mapped_tg", serialized=True):
+
+            @task
+            def alter_input(inp: str) -> str:
+                return f"{inp}_Altered"
+
+            @task
+            def print_task(orig_input: str, altered_input: str) -> str:
+                return f"orig:{orig_input},altered:{altered_input}"
+
+            @task_group
+            def inner_task_group(orig_input: str) -> None:
+                altered_input = alter_input(orig_input)
+                print_task(orig_input, altered_input)
+
+            @task_group
+            def expandable_task_group(param: str) -> None:
+                inner_task_group(param)
+
+            expandable_task_group.expand(param=["One", "Two", "Three"])
+
+        dr = dag_maker.create_dagrun()
+
+        # Set all alter_input tasks to success so print_task can run
+        for ti in dr.get_task_instances():
+            if "alter_input" in ti.task_id and ti.map_index >= 0:
+                ti.state = State.SUCCESS
+            elif "print_task" in ti.task_id and ti.map_index >= 0:
+                ti.set_state(State.QUEUED)
+        dag_maker.session.flush()
+
+        # Expected upstream_map_indexes for each print_task instance
+        expected_upstream_map_indexes = {
+            ("expandable_task_group.inner_task_group.print_task", 0): {
+                "expandable_task_group.inner_task_group.alter_input": 0
+            },
+            ("expandable_task_group.inner_task_group.print_task", 1): {
+                "expandable_task_group.inner_task_group.alter_input": 1
+            },
+            ("expandable_task_group.inner_task_group.print_task", 2): {
+                "expandable_task_group.inner_task_group.alter_input": 2
+            },
+        }
+
+        # Get only the expanded print_task instances (not the template)
+        print_task_tis = [
+            ti for ti in dr.get_task_instances() if "print_task" in ti.task_id 
and ti.map_index >= 0
+        ]
+
+        # Test each print_task instance
+        for ti in print_task_tis:
+            response = client.patch(
+                f"/execution/task-instances/{ti.id}/run",
+                json={
+                    "state": "running",
+                    "hostname": "random-hostname",
+                    "unixname": "random-unixname",
+                    "pid": 100,
+                    "start_date": "2024-09-30T12:00:00Z",
+                },
+            )
+
+            assert response.status_code == 200
+            upstream_map_indexes = response.json()["upstream_map_indexes"]
+            expected = expected_upstream_map_indexes[(ti.task_id, 
ti.map_index)]
+
+            assert upstream_map_indexes == expected, (
+                f"Task {ti.task_id}[{ti.map_index}] should have 
upstream_map_indexes {expected}, "
+                f"but got {upstream_map_indexes}"
+            )
+
     def test_dynamic_task_mapping_with_xcom(self, client, dag_maker, 
create_task_instance, session, run_task):
         """
         Test that the Task Instance upstream_map_indexes is correctly fetched 
when to running the Task Instances with xcom

Reply via email to