This is an automated email from the ASF dual-hosted git repository.

kaxil pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 862b647facf Fix per-index evaluation of ONE_FAILED in mapped task 
groups (#67684)
862b647facf is described below

commit 862b647facf4d6df67c63218257fd0dcbe11c315
Author: Shahar Epstein <[email protected]>
AuthorDate: Fri May 29 13:25:02 2026 +0300

    Fix per-index evaluation of ONE_FAILED in mapped task groups (#67684)
    
    A task using a "fast triggered" trigger rule (ONE_FAILED, ONE_SUCCESS,
    ONE_DONE) inside a mapped task group was evaluated against every expanded
    instance of its upstream, instead of the upstream instance sharing its own
    map index. As a result a single failed (or succeeded) upstream instance
    wrongly triggered the rule for every expanded instance of the task — e.g. a
    mapped ONE_FAILED reporting task ran for all map indexes when only one
    upstream had failed.
    
    The broad "depend on every upstream instance" behavior is only needed for
    the not-yet-expanded summary task instance (map_index < 0), so a fast
    trigger rule does not prematurely skip the task before the mapped task group
    expands (the case fixed in #34023). Restrict that special case to the
    summary instance; expanded instances now use the normal per-map-index
    upstream resolution.
    
    closes: #50210
---
 airflow-core/newsfragments/67684.bugfix.rst        |   1 +
 .../src/airflow/ti_deps/deps/trigger_rule_dep.py   |   9 +-
 .../tests/unit/models/test_mappedoperator.py       | 108 +++++++++++++++++++++
 3 files changed, 117 insertions(+), 1 deletion(-)

diff --git a/airflow-core/newsfragments/67684.bugfix.rst 
b/airflow-core/newsfragments/67684.bugfix.rst
new file mode 100644
index 00000000000..9cf4ce96c85
--- /dev/null
+++ b/airflow-core/newsfragments/67684.bugfix.rst
@@ -0,0 +1 @@
+Fix ``ONE_FAILED``/``ONE_SUCCESS``/``ONE_DONE`` trigger rules inside a mapped 
task group being evaluated against every upstream instance instead of the 
upstream instance sharing the task's own map index, which wrongly triggered the 
rule for every expanded instance when only one upstream had failed (or 
succeeded).
diff --git a/airflow-core/src/airflow/ti_deps/deps/trigger_rule_dep.py 
b/airflow-core/src/airflow/ti_deps/deps/trigger_rule_dep.py
index 4943913d328..eab9792ba32 100644
--- a/airflow-core/src/airflow/ti_deps/deps/trigger_rule_dep.py
+++ b/airflow-core/src/airflow/ti_deps/deps/trigger_rule_dep.py
@@ -175,7 +175,14 @@ class TriggerRuleDep(BaseTIDep):
                 assert task.dag
                 assert task.task_group
 
-            if is_mapped(task.task_group):
+            # Only the not-yet-expanded summary ti (map_index < 0) needs the 
broad
+            # "depend on every upstream ti" behavior, so a fast-triggered rule
+            # (ONE_SUCCESS / ONE_FAILED / ONE_DONE) does not skip it before the
+            # mapped task group has expanded (see #34023). Once the ti is 
expanded,
+            # each instance must depend on the upstream instance(s) that share 
its
+            # map index, otherwise a single upstream failure would wrongly 
trigger
+            # every expanded instance (see #50210).
+            if is_mapped(task.task_group) and ti.map_index < 0:
                 is_fast_triggered = task.trigger_rule in (TR.ONE_SUCCESS, 
TR.ONE_FAILED, TR.ONE_DONE)
                 if is_fast_triggered and upstream_id not in set(
                     _iter_expansion_dependencies(task_group=task.task_group)
diff --git a/airflow-core/tests/unit/models/test_mappedoperator.py 
b/airflow-core/tests/unit/models/test_mappedoperator.py
index b5bbbd7f3a9..e00a821e056 100644
--- a/airflow-core/tests/unit/models/test_mappedoperator.py
+++ b/airflow-core/tests/unit/models/test_mappedoperator.py
@@ -1601,6 +1601,114 @@ def 
test_mapped_tasks_in_mapped_task_group_waits_for_upstreams_to_complete(dag_m
     assert not ti3.state
 
 
+def test_one_failed_trigger_rule_in_mapped_task_group_is_per_index(dag_maker):
+    """Regression test for #50210.
+
+    A task with the ``ONE_FAILED`` trigger rule inside a mapped task group must
+    be evaluated against the upstream instance that shares its own map index,
+    not against every upstream instance of the group. Otherwise a single failed
+    upstream instance would wrongly trigger the rule for every expanded 
instance.
+    """
+    with dag_maker(dag_id="test_one_failed_in_mapped_task_group") as dag:
+
+        @task
+        def divide(i):
+            return 30 / i
+
+        @task(trigger_rule=TriggerRule.ONE_FAILED)
+        def report_failure(i):
+            pass
+
+        @task
+        def report_success(i):
+            pass
+
+        @task
+        def gen_examples():
+            return [0, 1, 2, 3]
+
+        @task_group
+        def divide_and_report(i):
+            divide(i) >> [report_success(i), report_failure(i)]
+
+        divide_and_report.expand(i=gen_examples())
+
+    dr = dag.test()
+
+    states: dict[str, dict[int, str | None]] = defaultdict(dict)
+    for ti in dr.get_task_instances():
+        states[ti.task_id][ti.map_index] = ti.state
+
+    # divide(0) fails (ZeroDivisionError); the rest succeed.
+    assert states["divide_and_report.divide"] == {0: "failed", 1: "success", 
2: "success", 3: "success"}
+    # Only report_failure sharing divide(0)'s map index should run; the rest 
are skipped.
+    assert states["divide_and_report.report_failure"] == {
+        0: "success",
+        1: "skipped",
+        2: "skipped",
+        3: "skipped",
+    }
+    # report_success mirrors the opposite: it is upstream_failed only where 
divide failed.
+    assert states["divide_and_report.report_success"] == {
+        0: "upstream_failed",
+        1: "success",
+        2: "success",
+        3: "success",
+    }
+
+
+def 
test_one_failed_trigger_rule_runs_on_indirect_failure_in_mapped_task_group(dag_maker):
+    """Regression test for #34023.
+
+    A ``ONE_FAILED`` task at the end of a chain inside a mapped task group must
+    still run for every expanded instance whose (indirect) upstream failed, and
+    must not be skipped prematurely before the group has expanded. This guards
+    the end-to-end outcome of the fix that the per-index change in #50210 
builds on.
+    """
+    with dag_maker(dag_id="test_one_failed_indirect_in_mapped_task_group") as 
dag:
+
+        @task
+        def get_records():
+            return ["a", "b", "c"]
+
+        @task
+        def submit_job(record):
+            pass
+
+        @task
+        def fake_sensor(record):
+            raise RuntimeError("boo")
+
+        @task
+        def deliver_record(record):
+            pass
+
+        @task(trigger_rule=TriggerRule.ONE_FAILED)
+        def handle_failed_delivery(record):
+            pass
+
+        @task_group(group_id="deliver_records")
+        def deliver_record_task_group(record):
+            (
+                submit_job(record)
+                >> fake_sensor(record)
+                >> deliver_record(record)
+                >> handle_failed_delivery(record)
+            )
+
+        deliver_record_task_group.expand(record=get_records())
+
+    dr = dag.test()
+
+    states: dict[str, dict[int, str | None]] = defaultdict(dict)
+    for ti in dr.get_task_instances():
+        states[ti.task_id][ti.map_index] = ti.state
+
+    # fake_sensor fails for every index, so handle_failed_delivery must run 
everywhere.
+    assert states["deliver_records.fake_sensor"] == {0: "failed", 1: "failed", 
2: "failed"}
+    assert states["deliver_records.handle_failed_delivery"] == {0: "success", 
1: "success", 2: "success"}
+
+
 def test_mapped_operator_retry_delay_default(dag_maker):
     """
     Test that MappedOperator.retry_delay returns default value when not 
explicitly set.

Reply via email to