Copilot commented on code in PR #60016:
URL: https://github.com/apache/airflow/pull/60016#discussion_r3066489725
##########
airflow-core/tests/unit/models/test_taskinstance.py:
##########
@@ -3131,6 +3131,88 @@ def
test_find_relevant_relatives_with_non_mapped_task_as_tuple(dag_maker, sessio
assert result == {"t1"}
[email protected]("map_index", [0, 1, 2])
+def test_find_relevant_relatives_downstream_same_group_siblings(dag_maker,
session, map_index):
+ """
+ Test that clearing a specific mapped task instance in a mapped task group
+ only clears downstream siblings with the same map_index (depth-first
execution).
+
+ This tests the fix for issue #40543 where clearing etl.e[map_index] should
only
+ clear etl.t[map_index] and etl.l[map_index], not all instances of etl.t[]
and etl.l[].
+ """
+ files = ["a", "b", "c"]
+
+ with dag_maker(session=session) as dag:
+
+ @task_group(group_id="etl")
+ def etl_pipeline(file):
+ e = EmptyOperator(task_id="e")
+ t = EmptyOperator(task_id="t")
+ l = EmptyOperator(task_id="l")
+ e >> t >> l
+
+ etl = etl_pipeline.expand(file=files)
Review Comment:
`etl` is assigned but never used. With Ruff's default `F` rules enabled,
this will raise an unused-variable violation (F841). Consider dropping the
assignment and just calling `etl_pipeline.expand(...)`, or use `_ = ...` if the
assignment is needed for readability.
##########
airflow-core/tests/unit/models/test_taskinstance.py:
##########
@@ -3131,6 +3131,88 @@ def
test_find_relevant_relatives_with_non_mapped_task_as_tuple(dag_maker, sessio
assert result == {"t1"}
[email protected]("map_index", [0, 1, 2])
+def test_find_relevant_relatives_downstream_same_group_siblings(dag_maker,
session, map_index):
+ """
+ Test that clearing a specific mapped task instance in a mapped task group
+ only clears downstream siblings with the same map_index (depth-first
execution).
+
+ This tests the fix for issue #40543 where clearing etl.e[map_index] should
only
+ clear etl.t[map_index] and etl.l[map_index], not all instances of etl.t[]
and etl.l[].
+ """
+ files = ["a", "b", "c"]
+
+ with dag_maker(session=session) as dag:
+
+ @task_group(group_id="etl")
+ def etl_pipeline(file):
+ e = EmptyOperator(task_id="e")
+ t = EmptyOperator(task_id="t")
+ l = EmptyOperator(task_id="l")
+ e >> t >> l
+
+ etl = etl_pipeline.expand(file=files)
+
+ dr = dag_maker.create_dagrun(state="success")
+
+ # When clearing etl.e[map_index] with include_downstream=True,
+ # only etl.t[map_index] and etl.l[map_index] should be cleared (same
map_index)
+ result = find_relevant_relatives(
+ normal_tasks=[],
+ mapped_tasks=[("etl.e", map_index)],
+ direction="downstream",
+ dag=dag,
+ run_id=dr.run_id,
+ session=session,
+ )
+
+ # Should only return t[map_index] and l[map_index], not all instances
+ expected = {("etl.t", map_index), ("etl.l", map_index)}
+ assert result == expected, (
+ f"Expected only siblings with same map_index ({map_index}), "
+ f"but got {result}. This violates depth-first execution."
+ )
+
+
+def test_find_relevant_relatives_upstream_same_group_siblings(dag_maker,
session):
+ """
+ Test that clearing a specific mapped task instance in a mapped task group
+ only clears upstream siblings with the same map_index (depth-first
execution).
+ """
+ files = ["a", "b", "c"]
+
+ with dag_maker(session=session) as dag:
+
+ @task_group(group_id="etl")
+ def etl_pipeline(file):
+ e = EmptyOperator(task_id="e")
+ t = EmptyOperator(task_id="t")
+ l = EmptyOperator(task_id="l")
+ e >> t >> l
+
+ etl = etl_pipeline.expand(file=files)
Review Comment:
`etl` is assigned but never used here as well, which is likely to fail
Ruff's unused-variable check (F841). Consider removing the assignment or
assigning to `_` instead.
##########
airflow-core/tests/unit/models/test_taskinstance.py:
##########
@@ -3131,6 +3131,88 @@ def
test_find_relevant_relatives_with_non_mapped_task_as_tuple(dag_maker, sessio
assert result == {"t1"}
[email protected]("map_index", [0, 1, 2])
+def test_find_relevant_relatives_downstream_same_group_siblings(dag_maker,
session, map_index):
+ """
+ Test that clearing a specific mapped task instance in a mapped task group
+ only clears downstream siblings with the same map_index (depth-first
execution).
+
+ This tests the fix for issue #40543 where clearing etl.e[map_index] should
only
+ clear etl.t[map_index] and etl.l[map_index], not all instances of etl.t[]
and etl.l[].
Review Comment:
The test docstring references a specific issue number ("issue #40543").
Airflow's test guidelines generally discourage embedding ticket/issue numbers
in test docstrings since it becomes stale context; prefer describing the
behavior only, and keep issue references in the PR description or code comments
where needed.
##########
airflow-core/src/airflow/models/taskinstance.py:
##########
@@ -2186,6 +2186,15 @@ def tg2(inp):
# between the ancestor and further expansion happened inside it.
ancestor_ti_count = get_mapped_ti_count(common_ancestor, run_id,
session=session)
+
+ # Special case: If both tasks are siblings in the same mapped task group
+ # (ti_count == ancestor_ti_count), they share the same map_index.
+ # This ensures depth-first execution where each mapped instance operates
independently.
+ if ti_count == ancestor_ti_count:
+ # Both task and relative are in the same mapped group with no further
expansion.
+ # They should share the same map_index.
+ return map_index
+
Review Comment:
The early return when `ti_count == ancestor_ti_count` short-circuits the
later `_is_further_mapped_inside(relative, common_ancestor)` check. This can
produce an incorrect single `map_index` when the *relative* is further mapped
inside the common ancestor (e.g., a mapped operator nested inside the same
mapped task group), where the function should instead return a `range` for the
relevant subset. This special-case also appears redundant because the existing
`ancestor_map_index` logic already yields `map_index` in this scenario when the
relative is not further mapped. Consider removing this branch, or at least
gating it on `not _is_further_mapped_inside(relative, common_ancestor)` so
nested mapping continues to work correctly.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]