This is an automated email from the ASF dual-hosted git repository.

shahar pushed a commit to branch v2-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v2-10-test by this push:
     new 99e713efa39 [BACKPORT] Prevent using `trigger_rule=TriggerRule.ALWAYS` 
in a task-generated mapping within bare tasks (#44751) (#44769)
99e713efa39 is described below

commit 99e713efa398bef9d76eb4e8145538e828a05720
Author: Shahar Epstein <[email protected]>
AuthorDate: Sun Dec 8 08:16:33 2024 +0200

    [BACKPORT] Prevent using `trigger_rule=TriggerRule.ALWAYS` in a 
task-generated mapping within bare tasks (#44751) (#44769)
---
 airflow/decorators/base.py                         | 21 +++++++++++
 airflow/utils/task_group.py                        |  4 ++-
 .../dynamic-task-mapping.rst                       | 10 +++---
 newsfragments/44751.bugfix.rst                     |  1 +
 tests/decorators/test_mapped.py                    | 41 ++++++++++++++++++++++
 tests/decorators/test_task_group.py                |  5 +--
 6 files changed, 75 insertions(+), 7 deletions(-)

diff --git a/airflow/decorators/base.py b/airflow/decorators/base.py
index bcb64aaa6eb..c0d46df67f1 100644
--- a/airflow/decorators/base.py
+++ b/airflow/decorators/base.py
@@ -403,6 +403,12 @@ class _TaskDecorator(ExpandableFactory, Generic[FParams, 
FReturn, OperatorSubcla
         super()._validate_arg_names(func, kwargs)
 
     def expand(self, **map_kwargs: OperatorExpandArgument) -> XComArg:
+        if self.kwargs.get("trigger_rule") == TriggerRule.ALWAYS and any(
+            [isinstance(expanded, XComArg) for expanded in map_kwargs.values()]
+        ):
+            raise ValueError(
+                "Task-generated mapping within a task using 'expand' is not 
allowed with trigger rule 'always'."
+            )
         if not map_kwargs:
             raise TypeError("no arguments to expand against")
         self._validate_arg_names("expand", map_kwargs)
@@ -416,6 +422,21 @@ class _TaskDecorator(ExpandableFactory, Generic[FParams, 
FReturn, OperatorSubcla
         return self._expand(DictOfListsExpandInput(map_kwargs), strict=False)
 
     def expand_kwargs(self, kwargs: OperatorExpandKwargsArgument, *, strict: 
bool = True) -> XComArg:
+        if (
+            self.kwargs.get("trigger_rule") == TriggerRule.ALWAYS
+            and not isinstance(kwargs, XComArg)
+            and any(
+                [
+                    isinstance(v, XComArg)
+                    for kwarg in kwargs
+                    if not isinstance(kwarg, XComArg)
+                    for v in kwarg.values()
+                ]
+            )
+        ):
+            raise ValueError(
+                "Task-generated mapping within a task using 'expand_kwargs' is 
not allowed with trigger rule 'always'."
+            )
         if isinstance(kwargs, Sequence):
             for item in kwargs:
                 if not isinstance(item, (XComArg, Mapping)):
diff --git a/airflow/utils/task_group.py b/airflow/utils/task_group.py
index f5e95bde1a8..2a4dadf5fd6 100644
--- a/airflow/utils/task_group.py
+++ b/airflow/utils/task_group.py
@@ -610,7 +610,9 @@ class MappedTaskGroup(TaskGroup):
 
         for child in self.children.values():
             if isinstance(child, AbstractOperator) and child.trigger_rule == 
TriggerRule.ALWAYS:
-                raise ValueError("Tasks in a mapped task group cannot have 
trigger_rule set to 'ALWAYS'")
+                raise ValueError(
+                    "Task-generated mapping within a mapped task group is not 
allowed with trigger rule 'always'"
+                )
             yield from self._iter_child(child)
 
     def iter_mapped_dependencies(self) -> Iterator[Operator]:
diff --git 
a/docs/apache-airflow/authoring-and-scheduling/dynamic-task-mapping.rst 
b/docs/apache-airflow/authoring-and-scheduling/dynamic-task-mapping.rst
index df74038fd2c..7607fd18a27 100644
--- a/docs/apache-airflow/authoring-and-scheduling/dynamic-task-mapping.rst
+++ b/docs/apache-airflow/authoring-and-scheduling/dynamic-task-mapping.rst
@@ -84,10 +84,6 @@ The grid view also provides visibility into your mapped 
tasks in the details pan
 
     Although we show a "reduce" task here (``sum_it``) you don't have to have 
one, the mapped tasks will still be executed even if they have no downstream 
tasks.
 
-.. warning:: ``TriggerRule.ALWAYS`` cannot be utilized in expanded tasks
-
-    Assigning ``trigger_rule=TriggerRule.ALWAYS`` in expanded tasks is 
forbidden, as expanded parameters will be undefined with the task's immediate 
execution.
-    This is enforced at the time of the DAG parsing, and will raise an error 
if you try to use it.
 
 Task-generated Mapping
 ----------------------
@@ -113,6 +109,12 @@ The above examples we've shown could all be achieved with 
a ``for`` loop in the
 
 The ``make_list`` task runs as a normal task and must return a list or dict 
(see `What data types can be expanded?`_), and then the ``consumer`` task will 
be called four times, once with each value in the return of ``make_list``.
 
+.. warning:: Task-generated mapping cannot be utilized with 
``TriggerRule.ALWAYS``
+
+    Assigning ``trigger_rule=TriggerRule.ALWAYS`` in task-generated mapping is 
not allowed, as expanded parameters are undefined with the task's immediate 
execution.
+    This is enforced at the time of the DAG parsing, for both tasks and mapped 
tasks groups, and will raise an error if you try to use it.
+    In the recent example, setting ``trigger_rule=TriggerRule.ALWAYS`` in the 
``consumer`` task will raise an error since ``make_list`` is a task-generated 
mapping.
+
 Repeated mapping
 ----------------
 
diff --git a/newsfragments/44751.bugfix.rst b/newsfragments/44751.bugfix.rst
new file mode 100644
index 00000000000..c85601d0fe1
--- /dev/null
+++ b/newsfragments/44751.bugfix.rst
@@ -0,0 +1 @@
+``TriggerRule.ALWAYS`` cannot be utilized within a task-generated mapping, 
either in bare tasks (fixed in this PR) or mapped task groups (fixed in PR 
#44368). The issue with doing so, is that the task is immediately executed 
without waiting for the upstreams's mapping results, which certainly leads to 
failure of the task. This fix avoids it by raising an exception when it is 
detected during DAG parsing.
diff --git a/tests/decorators/test_mapped.py b/tests/decorators/test_mapped.py
index 3812367425f..541d327a975 100644
--- a/tests/decorators/test_mapped.py
+++ b/tests/decorators/test_mapped.py
@@ -17,6 +17,9 @@
 # under the License.
 from __future__ import annotations
 
+import pytest
+
+from airflow.decorators import task
 from airflow.models.dag import DAG
 from airflow.utils.task_group import TaskGroup
 from tests.models import DEFAULT_DATE
@@ -36,3 +39,41 @@ def test_mapped_task_group_id_prefix_task_id():
 
     dag.get_task("t1") == x1.operator
     dag.get_task("g.t2") == x2.operator
+
+
[email protected]_test
+def 
test_fail_task_generated_mapping_with_trigger_rule_always__exapnd(dag_maker, 
session):
+    with DAG(dag_id="d", schedule=None, start_date=DEFAULT_DATE):
+
+        @task
+        def get_input():
+            return ["world", "moon"]
+
+        @task(trigger_rule="always")
+        def hello(input):
+            print(f"Hello, {input}")
+
+        with pytest.raises(
+            ValueError,
+            match="Task-generated mapping within a task using 'expand' is not 
allowed with trigger rule 'always'",
+        ):
+            hello.expand(input=get_input())
+
+
[email protected]_test
+def 
test_fail_task_generated_mapping_with_trigger_rule_always__exapnd_kwargs(dag_maker,
 session):
+    with DAG(dag_id="d", schedule=None, start_date=DEFAULT_DATE):
+
+        @task
+        def get_input():
+            return ["world", "moon"]
+
+        @task(trigger_rule="always")
+        def hello(input, input2):
+            print(f"Hello, {input}, {input2}")
+
+        with pytest.raises(
+            ValueError,
+            match="Task-generated mapping within a task using 'expand_kwargs' 
is not allowed with trigger rule 'always'",
+        ):
+            hello.expand_kwargs([{"input": get_input(), "input2": 
get_input()}])
diff --git a/tests/decorators/test_task_group.py 
b/tests/decorators/test_task_group.py
index 2dab23ca38f..ce1b518a8ff 100644
--- a/tests/decorators/test_task_group.py
+++ b/tests/decorators/test_task_group.py
@@ -135,7 +135,7 @@ def test_expand_fail_empty():
 
 
 @pytest.mark.db_test
-def test_expand_fail_trigger_rule_always(dag_maker, session):
+def test_fail_task_generated_mapping_with_trigger_rule_always(dag_maker, 
session):
     @dag(schedule=None, start_date=pendulum.datetime(2022, 1, 1))
     def pipeline():
         @task
@@ -151,7 +151,8 @@ def test_expand_fail_trigger_rule_always(dag_maker, 
session):
             t1(param)
 
         with pytest.raises(
-            ValueError, match="Tasks in a mapped task group cannot have 
trigger_rule set to 'ALWAYS'"
+            ValueError,
+            match="Task-generated mapping within a mapped task group is not 
allowed with trigger rule 'always'",
         ):
             tg.expand(param=get_param())
 

Reply via email to