Copilot commented on code in PR #60016:
URL: https://github.com/apache/airflow/pull/60016#discussion_r3066489725


##########
airflow-core/tests/unit/models/test_taskinstance.py:
##########
@@ -3131,6 +3131,88 @@ def 
test_find_relevant_relatives_with_non_mapped_task_as_tuple(dag_maker, sessio
     assert result == {"t1"}
 
 
[email protected]("map_index", [0, 1, 2])
+def test_find_relevant_relatives_downstream_same_group_siblings(dag_maker, 
session, map_index):
+    """
+    Test that clearing a specific mapped task instance in a mapped task group
+    only clears downstream siblings with the same map_index (depth-first 
execution).
+
+    This tests the fix for issue #40543 where clearing etl.e[map_index] should 
only
+    clear etl.t[map_index] and etl.l[map_index], not all instances of etl.t[] 
and etl.l[].
+    """
+    files = ["a", "b", "c"]
+
+    with dag_maker(session=session) as dag:
+
+        @task_group(group_id="etl")
+        def etl_pipeline(file):
+            e = EmptyOperator(task_id="e")
+            t = EmptyOperator(task_id="t")
+            l = EmptyOperator(task_id="l")
+            e >> t >> l
+
+        etl = etl_pipeline.expand(file=files)

Review Comment:
   `etl` is assigned but never used. With Ruff's default `F` rules enabled, 
this will raise an unused-variable violation (F841). Consider dropping the 
assignment and just calling `etl_pipeline.expand(...)`, or use `_ = ...` if the 
assignment is needed for readability.
   



##########
airflow-core/tests/unit/models/test_taskinstance.py:
##########
@@ -3131,6 +3131,88 @@ def 
test_find_relevant_relatives_with_non_mapped_task_as_tuple(dag_maker, sessio
     assert result == {"t1"}
 
 
[email protected]("map_index", [0, 1, 2])
+def test_find_relevant_relatives_downstream_same_group_siblings(dag_maker, 
session, map_index):
+    """
+    Test that clearing a specific mapped task instance in a mapped task group
+    only clears downstream siblings with the same map_index (depth-first 
execution).
+
+    This tests the fix for issue #40543 where clearing etl.e[map_index] should 
only
+    clear etl.t[map_index] and etl.l[map_index], not all instances of etl.t[] 
and etl.l[].
+    """
+    files = ["a", "b", "c"]
+
+    with dag_maker(session=session) as dag:
+
+        @task_group(group_id="etl")
+        def etl_pipeline(file):
+            e = EmptyOperator(task_id="e")
+            t = EmptyOperator(task_id="t")
+            l = EmptyOperator(task_id="l")
+            e >> t >> l
+
+        etl = etl_pipeline.expand(file=files)
+
+    dr = dag_maker.create_dagrun(state="success")
+
+    # When clearing etl.e[map_index] with include_downstream=True,
+    # only etl.t[map_index] and etl.l[map_index] should be cleared (same 
map_index)
+    result = find_relevant_relatives(
+        normal_tasks=[],
+        mapped_tasks=[("etl.e", map_index)],
+        direction="downstream",
+        dag=dag,
+        run_id=dr.run_id,
+        session=session,
+    )
+
+    # Should only return t[map_index] and l[map_index], not all instances
+    expected = {("etl.t", map_index), ("etl.l", map_index)}
+    assert result == expected, (
+        f"Expected only siblings with same map_index ({map_index}), "
+        f"but got {result}. This violates depth-first execution."
+    )
+
+
+def test_find_relevant_relatives_upstream_same_group_siblings(dag_maker, 
session):
+    """
+    Test that clearing a specific mapped task instance in a mapped task group
+    only clears upstream siblings with the same map_index (depth-first 
execution).
+    """
+    files = ["a", "b", "c"]
+
+    with dag_maker(session=session) as dag:
+
+        @task_group(group_id="etl")
+        def etl_pipeline(file):
+            e = EmptyOperator(task_id="e")
+            t = EmptyOperator(task_id="t")
+            l = EmptyOperator(task_id="l")
+            e >> t >> l
+
+        etl = etl_pipeline.expand(file=files)

Review Comment:
   `etl` is assigned but never used here as well, which is likely to fail 
Ruff's unused-variable check (F841). Consider removing the assignment or 
assigning to `_` instead.
   



##########
airflow-core/tests/unit/models/test_taskinstance.py:
##########
@@ -3131,6 +3131,88 @@ def 
test_find_relevant_relatives_with_non_mapped_task_as_tuple(dag_maker, sessio
     assert result == {"t1"}
 
 
[email protected]("map_index", [0, 1, 2])
+def test_find_relevant_relatives_downstream_same_group_siblings(dag_maker, 
session, map_index):
+    """
+    Test that clearing a specific mapped task instance in a mapped task group
+    only clears downstream siblings with the same map_index (depth-first 
execution).
+
+    This tests the fix for issue #40543 where clearing etl.e[map_index] should 
only
+    clear etl.t[map_index] and etl.l[map_index], not all instances of etl.t[] 
and etl.l[].

Review Comment:
   The test docstring references a specific issue number ("issue #40543"). 
Airflow's test guidelines generally discourage embedding ticket/issue numbers 
in test docstrings since it becomes stale context; prefer describing the 
behavior only, and keep issue references in the PR description or code comments 
where needed.
   



##########
airflow-core/src/airflow/models/taskinstance.py:
##########
@@ -2186,6 +2186,15 @@ def tg2(inp):
     # between the ancestor and further expansion happened inside it.
 
     ancestor_ti_count = get_mapped_ti_count(common_ancestor, run_id, 
session=session)
+
+    # Special case: If both tasks are siblings in the same mapped task group
+    # (ti_count == ancestor_ti_count), they share the same map_index.
+    # This ensures depth-first execution where each mapped instance operates 
independently.
+    if ti_count == ancestor_ti_count:
+        # Both task and relative are in the same mapped group with no further 
expansion.
+        # They should share the same map_index.
+        return map_index
+

Review Comment:
   The early return when `ti_count == ancestor_ti_count` short-circuits the 
later `_is_further_mapped_inside(relative, common_ancestor)` check. This can 
produce an incorrect single `map_index` when the *relative* is further mapped 
inside the common ancestor (e.g., a mapped operator nested inside the same 
mapped task group), where the function should instead return a `range` for the 
relevant subset. This special-case also appears redundant because the existing 
`ancestor_map_index` logic already yields `map_index` in this scenario when the 
relative is not further mapped. Consider removing this branch, or at least 
gating it on `not _is_further_mapped_inside(relative, common_ancestor)` so 
nested mapping continues to work correctly.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to