This is an automated email from the ASF dual-hosted git repository.

shahar pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new f6e0900b9e0 Prevent using trigger_rule="always" in a dynamic mapped 
task (#43368)
f6e0900b9e0 is described below

commit f6e0900b9e0269ba35f2239e3e4f950a117e1ede
Author: Shahar Epstein <[email protected]>
AuthorDate: Fri Nov 8 08:48:15 2024 +0200

    Prevent using trigger_rule="always" in a dynamic mapped task (#43368)
---
 .../dynamic-task-mapping.rst                       |  5 +++++
 task_sdk/src/airflow/sdk/definitions/taskgroup.py  | 22 +++++++++++++++----
 tests/decorators/test_task_group.py                | 25 +++++++++++++++++++++-
 3 files changed, 47 insertions(+), 5 deletions(-)

diff --git 
a/docs/apache-airflow/authoring-and-scheduling/dynamic-task-mapping.rst 
b/docs/apache-airflow/authoring-and-scheduling/dynamic-task-mapping.rst
index c01ab7b1cf4..426a720781e 100644
--- a/docs/apache-airflow/authoring-and-scheduling/dynamic-task-mapping.rst
+++ b/docs/apache-airflow/authoring-and-scheduling/dynamic-task-mapping.rst
@@ -84,6 +84,11 @@ The grid view also provides visibility into your mapped 
tasks in the details pan
 
     Although we show a "reduce" task here (``sum_it``) you don't have to have 
one, the mapped tasks will still be executed even if they have no downstream 
tasks.
 
+.. warning:: ``TriggerRule.ALWAYS`` cannot be utilized in expanded tasks
+
+    Assigning ``trigger_rule=TriggerRule.ALWAYS`` in expanded tasks is 
forbidden, as expanded parameters will be undefined with the task's immediate 
execution.
+    This is enforced at the time of the DAG parsing, and will raise an error 
if you try to use it.
+
 Task-generated Mapping
 ----------------------
 
diff --git a/task_sdk/src/airflow/sdk/definitions/taskgroup.py 
b/task_sdk/src/airflow/sdk/definitions/taskgroup.py
index de4bd0c771a..1c8d1ded824 100644
--- a/task_sdk/src/airflow/sdk/definitions/taskgroup.py
+++ b/task_sdk/src/airflow/sdk/definitions/taskgroup.py
@@ -37,6 +37,7 @@ from airflow.exceptions import (
     TaskAlreadyInTaskGroup,
 )
 from airflow.sdk.definitions.node import DAGNode
+from airflow.utils.trigger_rule import TriggerRule
 
 if TYPE_CHECKING:
     from airflow.models.expandinput import ExpandInput
@@ -195,10 +196,15 @@ class TaskGroup(DAGNode):
 
     def __iter__(self):
         for child in self.children.values():
-            if isinstance(child, TaskGroup):
-                yield from child
-            else:
-                yield child
+            yield from self._iter_child(child)
+
+    @staticmethod
+    def _iter_child(child):
+        """Iterate over the children of this TaskGroup."""
+        if isinstance(child, TaskGroup):
+            yield from child
+        else:
+            yield child
 
     def add(self, task: DAGNode) -> DAGNode:
         """
@@ -574,6 +580,14 @@ class MappedTaskGroup(TaskGroup):
         super().__init__(**kwargs)
         self._expand_input = expand_input
 
+    def __iter__(self):
+        from airflow.models.abstractoperator import AbstractOperator
+
+        for child in self.children.values():
+            if isinstance(child, AbstractOperator) and child.trigger_rule == 
TriggerRule.ALWAYS:
+                raise ValueError("Tasks in a mapped task group cannot have 
trigger_rule set to 'ALWAYS'")
+            yield from self._iter_child(child)
+
     def iter_mapped_dependencies(self) -> Iterator[DAGNode]:
         """Upstream dependencies that provide XComs used by this mapped task 
group."""
         from airflow.models.xcom_arg import XComArg
diff --git a/tests/decorators/test_task_group.py 
b/tests/decorators/test_task_group.py
index 6120f94af3a..2dab23ca38f 100644
--- a/tests/decorators/test_task_group.py
+++ b/tests/decorators/test_task_group.py
@@ -22,10 +22,11 @@ from datetime import timedelta
 import pendulum
 import pytest
 
-from airflow.decorators import dag, task_group
+from airflow.decorators import dag, task, task_group
 from airflow.models.expandinput import DictOfListsExpandInput, 
ListOfDictsExpandInput, MappedArgument
 from airflow.operators.empty import EmptyOperator
 from airflow.utils.task_group import MappedTaskGroup
+from airflow.utils.trigger_rule import TriggerRule
 
 
 def test_task_group_with_overridden_kwargs():
@@ -133,6 +134,28 @@ def test_expand_fail_empty():
     assert str(ctx.value) == "no arguments to expand against"
 
 
[email protected]_test
+def test_expand_fail_trigger_rule_always(dag_maker, session):
+    @dag(schedule=None, start_date=pendulum.datetime(2022, 1, 1))
+    def pipeline():
+        @task
+        def get_param():
+            return ["a", "b", "c"]
+
+        @task(trigger_rule=TriggerRule.ALWAYS)
+        def t1(param):
+            return param
+
+        @task_group()
+        def tg(param):
+            t1(param)
+
+        with pytest.raises(
+            ValueError, match="Tasks in a mapped task group cannot have 
trigger_rule set to 'ALWAYS'"
+        ):
+            tg.expand(param=get_param())
+
+
 def test_expand_create_mapped():
     saved = {}
 

Reply via email to