This is an automated email from the ASF dual-hosted git repository. pierrejeambrun pushed a commit to branch v2-5-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit a0d130a22b36cf03906ead198e69d11979e31a13 Author: Hussein Awala <[email protected]> AuthorDate: Wed Mar 15 23:58:24 2023 +0100 Fix `TriggerRuleDep` when the mapped tasks count is 0 (#30084) * Fix TriggerRuleDep when the mapped tasks count is 0 * merge the two checks in a simple one (cherry picked from commit 8d22828e2519a356e9e38c78c3efee1d13b45675) --- airflow/models/taskinstance.py | 14 ++++++++------ tests/ti_deps/deps/test_trigger_rule_dep.py | 28 ++++++++++++++++++++++++++++ 2 files changed, 36 insertions(+), 6 deletions(-) diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index 2221d2579e..03ab48d46a 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -2655,6 +2655,14 @@ class TaskInstance(Base, LoggingMixin): :return: Specific map index or map indexes to pull, or ``None`` if we want to "whole" return value (i.e. no mapped task groups involved). """ + # This value should never be None since we already know the current task + # is in a mapped task group, and should have been expanded, despite that, + # we need to check that it is not None to satisfy Mypy. + # But this value can be 0 when we expand an empty list, for that it is + # necessary to check that ti_count is not 0 to avoid dividing by 0. + if not ti_count: + return None + # Find the innermost common mapped task group between the current task # If the current task and the referenced task does not have a common # mapped task group, the two are in different task mapping contexts @@ -2663,12 +2671,6 @@ class TaskInstance(Base, LoggingMixin): if common_ancestor is None: return None - # This value should never be None since we already know the current task - # is in a mapped task group, and should have been expanded. The check - # exists mainly to satisfy Mypy. - if ti_count is None: - return None - # At this point we know the two tasks share a mapped task group, and we # should use a "partial" value. Let's break down the mapped ti count # between the ancestor and further expansion happened inside it. diff --git a/tests/ti_deps/deps/test_trigger_rule_dep.py b/tests/ti_deps/deps/test_trigger_rule_dep.py index 42c979c93a..bcd72167f8 100644 --- a/tests/ti_deps/deps/test_trigger_rule_dep.py +++ b/tests/ti_deps/deps/test_trigger_rule_dep.py @@ -950,6 +950,34 @@ def test_upstream_in_mapped_group_triggers_only_relevant(dag_maker, session): assert sorted(tis) == [("t3", -1)] +def test_upstream_in_mapped_group_when_mapped_tasks_list_is_empty(dag_maker, session): + from airflow.decorators import task, task_group + + with dag_maker(session=session): + + @task + def t(x): + return x + + @task_group + def tg(x): + t1 = t.override(task_id="t1")(x=x) + return t.override(task_id="t2")(x=t1) + + t2 = tg.expand(x=[]) + t.override(task_id="t3")(x=t2) + + dr: DagRun = dag_maker.create_dagrun() + + def _one_scheduling_decision_iteration() -> dict[tuple[str, int], TaskInstance]: + decision = dr.task_instance_scheduling_decisions(session=session) + return {(ti.task_id, ti.map_index): ti for ti in decision.schedulable_tis} + + # should return an empty dict + tis = _one_scheduling_decision_iteration() + assert tis == {} + + def test_mapped_task_check_before_expand(dag_maker, session): with dag_maker(session=session):
