This is an automated email from the ASF dual-hosted git repository.

pierrejeambrun pushed a commit to branch v2-5-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit a0d130a22b36cf03906ead198e69d11979e31a13
Author: Hussein Awala <[email protected]>
AuthorDate: Wed Mar 15 23:58:24 2023 +0100

    Fix `TriggerRuleDep` when the mapped tasks count is 0 (#30084)
    
    * Fix TriggerRuleDep when the mapped tasks count is 0
    
    * merge the two checks in a simple one
    
    (cherry picked from commit 8d22828e2519a356e9e38c78c3efee1d13b45675)
---
 airflow/models/taskinstance.py              | 14 ++++++++------
 tests/ti_deps/deps/test_trigger_rule_dep.py | 28 ++++++++++++++++++++++++++++
 2 files changed, 36 insertions(+), 6 deletions(-)

diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 2221d2579e..03ab48d46a 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -2655,6 +2655,14 @@ class TaskInstance(Base, LoggingMixin):
         :return: Specific map index or map indexes to pull, or ``None`` if we
             want to "whole" return value (i.e. no mapped task groups involved).
         """
+        # This value should never be None since we already know the current 
task
+        # is in a mapped task group, and should have been expanded, despite 
that,
+        # we need to check that it is not None to satisfy Mypy.
+        # But this value can be 0 when we expand an empty list, for that it is
+        # necessary to check that ti_count is not 0 to avoid dividing by 0.
+        if not ti_count:
+            return None
+
         # Find the innermost common mapped task group between the current task
         # If the current task and the referenced task does not have a common
         # mapped task group, the two are in different task mapping contexts
@@ -2663,12 +2671,6 @@ class TaskInstance(Base, LoggingMixin):
         if common_ancestor is None:
             return None
 
-        # This value should never be None since we already know the current 
task
-        # is in a mapped task group, and should have been expanded. The check
-        # exists mainly to satisfy Mypy.
-        if ti_count is None:
-            return None
-
         # At this point we know the two tasks share a mapped task group, and we
         # should use a "partial" value. Let's break down the mapped ti count
         # between the ancestor and further expansion happened inside it.
diff --git a/tests/ti_deps/deps/test_trigger_rule_dep.py 
b/tests/ti_deps/deps/test_trigger_rule_dep.py
index 42c979c93a..bcd72167f8 100644
--- a/tests/ti_deps/deps/test_trigger_rule_dep.py
+++ b/tests/ti_deps/deps/test_trigger_rule_dep.py
@@ -950,6 +950,34 @@ def 
test_upstream_in_mapped_group_triggers_only_relevant(dag_maker, session):
     assert sorted(tis) == [("t3", -1)]
 
 
+def test_upstream_in_mapped_group_when_mapped_tasks_list_is_empty(dag_maker, 
session):
+    from airflow.decorators import task, task_group
+
+    with dag_maker(session=session):
+
+        @task
+        def t(x):
+            return x
+
+        @task_group
+        def tg(x):
+            t1 = t.override(task_id="t1")(x=x)
+            return t.override(task_id="t2")(x=t1)
+
+        t2 = tg.expand(x=[])
+        t.override(task_id="t3")(x=t2)
+
+    dr: DagRun = dag_maker.create_dagrun()
+
+    def _one_scheduling_decision_iteration() -> dict[tuple[str, int], 
TaskInstance]:
+        decision = dr.task_instance_scheduling_decisions(session=session)
+        return {(ti.task_id, ti.map_index): ti for ti in 
decision.schedulable_tis}
+
+    # should return an empty dict
+    tis = _one_scheduling_decision_iteration()
+    assert tis == {}
+
+
 def test_mapped_task_check_before_expand(dag_maker, session):
     with dag_maker(session=session):
 

Reply via email to