This is an automated email from the ASF dual-hosted git repository. ephraimanierobi pushed a commit to branch v2-7-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 12a801e9a28435605483d2011a0fca121e3864e4 Author: Daniel Standish <[email protected]> AuthorDate: Fri Aug 4 03:26:28 2023 -0700 Don't auto-add to context just by virtue of arrowing (#33102) * Don't auto-add to context just by virtue of arrowing * no add tasks * Add back removed import * Fixup tests * fixup! Fixup tests --------- Co-authored-by: Ephraim Anierobi <[email protected]> (cherry picked from commit c18a5a9f8630d7b9ec464eb30d61f084d01ffcec) --- airflow/models/taskmixin.py | 21 ------- tests/decorators/test_setup_teardown.py | 53 ------------------ tests/models/test_taskmixin.py | 97 --------------------------------- tests/utils/test_task_group.py | 71 +----------------------- 4 files changed, 1 insertion(+), 241 deletions(-) diff --git a/airflow/models/taskmixin.py b/airflow/models/taskmixin.py index 0dd013a579..f52749c7ff 100644 --- a/airflow/models/taskmixin.py +++ b/airflow/models/taskmixin.py @@ -24,7 +24,6 @@ import pendulum from airflow.exceptions import AirflowException, RemovedInAirflow3Warning from airflow.serialization.enums import DagAttributeTypes -from airflow.utils.setup_teardown import SetupTeardownContext from airflow.utils.types import NOTSET, ArgNotSet if TYPE_CHECKING: @@ -97,15 +96,11 @@ class DependencyMixin: def __lshift__(self, other: DependencyMixin | Sequence[DependencyMixin]): """Implements Task << Task.""" self.set_upstream(other) - self.set_setup_teardown_ctx_dependencies(other) - self.set_taskgroup_ctx_dependencies(other) return other def __rshift__(self, other: DependencyMixin | Sequence[DependencyMixin]): """Implements Task >> Task.""" self.set_downstream(other) - self.set_setup_teardown_ctx_dependencies(other) - self.set_taskgroup_ctx_dependencies(other) return other def __rrshift__(self, other: DependencyMixin | Sequence[DependencyMixin]): @@ -136,22 +131,6 @@ class DependencyMixin: for o in obj: yield from cls._iter_references(o) - def set_setup_teardown_ctx_dependencies(self, other: DependencyMixin | Sequence[DependencyMixin]): - if not SetupTeardownContext.active: - return - for op, _ in self._iter_references([self, other]): - SetupTeardownContext.update_context_map(op) - - def set_taskgroup_ctx_dependencies(self, other: DependencyMixin | Sequence[DependencyMixin]): - from airflow.utils.task_group import TaskGroupContext - - if not TaskGroupContext.active: - return - task_group = TaskGroupContext.get_current_task_group(None) - for op, _ in self._iter_references([self, other]): - if task_group: - op.add_to_taskgroup(task_group) - class TaskMixin(DependencyMixin): """Mixin to provide task-related things. diff --git a/tests/decorators/test_setup_teardown.py b/tests/decorators/test_setup_teardown.py index bcfb81b0ba..567ac5c9ab 100644 --- a/tests/decorators/test_setup_teardown.py +++ b/tests/decorators/test_setup_teardown.py @@ -1130,59 +1130,6 @@ class TestSetupTearDownTask: "mytask", } - def test_tasks_decorators_called_outside_context_manager_can_link_up(self, dag_maker): - @setup - def setuptask(): - print("setup") - - @task() - def mytask(): - print("mytask") - - @task() - def mytask2(): - print("mytask 2") - - @teardown - def teardowntask(): - print("teardown") - - with dag_maker() as dag: - task1 = mytask() - task2 = mytask2() - with setuptask() >> teardowntask(): - task1 >> task2 - - assert len(dag.task_group.children) == 4 - assert not dag.task_group.children["setuptask"].upstream_task_ids - assert dag.task_group.children["setuptask"].downstream_task_ids == {"mytask", "teardowntask"} - assert dag.task_group.children["mytask"].upstream_task_ids == {"setuptask"} - assert dag.task_group.children["mytask"].downstream_task_ids == {"mytask2"} - assert dag.task_group.children["mytask2"].upstream_task_ids == {"mytask"} - assert dag.task_group.children["mytask2"].downstream_task_ids == {"teardowntask"} - assert dag.task_group.children["teardowntask"].upstream_task_ids == {"mytask2", "setuptask"} - assert not dag.task_group.children["teardowntask"].downstream_task_ids - - def test_classic_tasks_called_outside_context_manager_can_link_up(self, dag_maker): - - with dag_maker() as dag: - setuptask = BashOperator(task_id="setuptask", bash_command="echo 1").as_setup() - teardowntask = BashOperator(task_id="teardowntask", bash_command="echo 1").as_teardown() - mytask = BashOperator(task_id="mytask", bash_command="echo 1") - mytask2 = BashOperator(task_id="mytask2", bash_command="echo 1") - with setuptask >> teardowntask: - mytask >> mytask2 - - assert len(dag.task_group.children) == 4 - assert not dag.task_group.children["setuptask"].upstream_task_ids - assert dag.task_group.children["setuptask"].downstream_task_ids == {"mytask", "teardowntask"} - assert dag.task_group.children["mytask"].upstream_task_ids == {"setuptask"} - assert dag.task_group.children["mytask"].downstream_task_ids == {"mytask2"} - assert dag.task_group.children["mytask2"].upstream_task_ids == {"mytask"} - assert dag.task_group.children["mytask2"].downstream_task_ids == {"teardowntask"} - assert dag.task_group.children["teardowntask"].upstream_task_ids == {"mytask2", "setuptask"} - assert not dag.task_group.children["teardowntask"].downstream_task_ids - def test_tasks_decorators_called_outside_context_manager_can_link_up_with_scope(self, dag_maker): @setup def setuptask(): diff --git a/tests/models/test_taskmixin.py b/tests/models/test_taskmixin.py index c1795f22a8..95aefd0faa 100644 --- a/tests/models/test_taskmixin.py +++ b/tests/models/test_taskmixin.py @@ -209,100 +209,3 @@ def test_cannot_set_on_failure_fail_dagrun_unless_teardown_taskflow(dag_maker): ValueError, match="Cannot mark task 'my_ok_task__2' as setup; task is already a teardown." ): m.operator.is_setup = True - - -def test_set_setup_teardown_ctx_dependencies_using_decorated_tasks(dag_maker): - - with dag_maker(): - t1 = make_task("t1", type_="decorated") - setuptask = make_task("setuptask", type_="decorated", setup_=True) - teardowntask = make_task("teardowntask", type_="decorated", teardown_=True) - with setuptask >> teardowntask as scope: - scope.add_task(t1) - - assert t1.operator.upstream_task_ids == {"setuptask"} - assert t1.operator.downstream_task_ids == {"teardowntask"} - - with dag_maker(): - t1 = make_task("t1", type_="decorated") - t2 = make_task("t2", type_="decorated") - setuptask = make_task("setuptask", type_="decorated", setup_=True) - teardowntask = make_task("teardowntask", type_="decorated", teardown_=True) - with setuptask >> teardowntask: - t1 >> t2 - assert t1.operator.upstream_task_ids == {"setuptask"} - assert t2.operator.downstream_task_ids == {"teardowntask"} - - with dag_maker(): - t1 = make_task("t1", type_="decorated") - t2 = make_task("t2", type_="decorated") - t3 = make_task("t3", type_="decorated") - setuptask = make_task("setuptask", type_="decorated", setup_=True) - teardowntask = make_task("teardowntask", type_="decorated", teardown_=True) - with setuptask >> teardowntask: - t1 >> [t2, t3] - - assert t1.operator.upstream_task_ids == {"setuptask"} - assert t2.operator.downstream_task_ids == {"teardowntask"} - assert t3.operator.downstream_task_ids == {"teardowntask"} - - with dag_maker(): - t1 = make_task("t1", type_="decorated") - t2 = make_task("t2", type_="decorated") - t3 = make_task("t3", type_="decorated") - setuptask = make_task("setuptask", type_="decorated", setup_=True) - teardowntask = make_task("teardowntask", type_="decorated", teardown_=True) - with setuptask >> teardowntask: - [t1, t2] >> t3 - - assert t1.operator.upstream_task_ids == {"setuptask"} - assert t2.operator.upstream_task_ids == {"setuptask"} - assert t3.operator.downstream_task_ids == {"teardowntask"} - - -def test_set_setup_teardown_ctx_dependencies_using_classic_tasks(dag_maker): - with dag_maker(): - t1 = make_task("t1", type_="classic") - setuptask = make_task("setuptask", type_="classic", setup_=True) - teardowntask = make_task("teardowntask", type_="classic", teardown_=True) - with setuptask >> teardowntask as scope: - scope.add_task(t1) - - assert t1.upstream_task_ids == {"setuptask"} - assert t1.downstream_task_ids == {"teardowntask"} - - with dag_maker(): - t1 = make_task("t1", type_="classic") - t2 = make_task("t2", type_="classic") - setuptask = make_task("setuptask", type_="classic", setup_=True) - teardowntask = make_task("teardowntask", type_="classic", teardown_=True) - with setuptask >> teardowntask: - t1 >> t2 - assert t1.upstream_task_ids == {"setuptask"} - assert t2.downstream_task_ids == {"teardowntask"} - - with dag_maker(): - t1 = make_task("t1", type_="classic") - t2 = make_task("t2", type_="classic") - t3 = make_task("t3", type_="classic") - setuptask = make_task("setuptask", type_="classic", setup_=True) - teardowntask = make_task("teardowntask", type_="classic", teardown_=True) - with setuptask >> teardowntask: - t1 >> [t2, t3] - - assert t1.upstream_task_ids == {"setuptask"} - assert t2.downstream_task_ids == {"teardowntask"} - assert t3.downstream_task_ids == {"teardowntask"} - - with dag_maker(): - t1 = make_task("t1", type_="classic") - t2 = make_task("t2", type_="classic") - t3 = make_task("t3", type_="classic") - setuptask = make_task("setuptask", type_="classic", setup_=True) - teardowntask = make_task("teardowntask", type_="classic", teardown_=True) - with setuptask >> teardowntask: - [t1, t2] >> t3 - - assert t1.upstream_task_ids == {"setuptask"} - assert t2.upstream_task_ids == {"setuptask"} - assert t3.downstream_task_ids == {"teardowntask"} diff --git a/tests/utils/test_task_group.py b/tests/utils/test_task_group.py index 4475e7141a..c021d98b88 100644 --- a/tests/utils/test_task_group.py +++ b/tests/utils/test_task_group.py @@ -23,7 +23,7 @@ import pendulum import pytest from airflow.decorators import dag, task as task_decorator, task_group as task_group_decorator -from airflow.exceptions import AirflowException, TaskAlreadyInTaskGroup +from airflow.exceptions import TaskAlreadyInTaskGroup from airflow.models.baseoperator import BaseOperator from airflow.models.dag import DAG from airflow.models.xcom_arg import XComArg @@ -1479,72 +1479,3 @@ def test_task_group_arrow_with_setups_teardowns(): tg1 >> w2 assert t1.downstream_task_ids == set() assert w1.downstream_task_ids == {"tg1.t1", "w2"} - - -def test_tasks_defined_outside_taskgrooup(dag_maker): - # Test that classic tasks defined outside a task group are added to the root task group - # when the relationships are defined inside the task group - with dag_maker() as dag: - t1 = make_task("t1") - t2 = make_task("t2") - t3 = make_task("t3") - with TaskGroup(group_id="tg1"): - t1 >> t2 >> t3 - dag.validate() - assert dag.task_group.children.keys() == {"tg1"} - assert dag.task_group.children["tg1"].children.keys() == {"t1", "t2", "t3"} - assert dag.task_group.children["tg1"].children["t1"].upstream_task_ids == set() - assert dag.task_group.children["tg1"].children["t1"].downstream_task_ids == {"t2"} - assert dag.task_group.children["tg1"].children["t2"].upstream_task_ids == {"t1"} - assert dag.task_group.children["tg1"].children["t2"].downstream_task_ids == {"t3"} - assert dag.task_group.children["tg1"].children["t3"].upstream_task_ids == {"t2"} - assert dag.task_group.children["tg1"].children["t3"].downstream_task_ids == set() - - # Test that decorated tasks defined outside a task group are added to the root task group - # when relationships are defined inside the task group - with dag_maker() as dag: - t1 = make_task("t1", type_="decorated") - t2 = make_task("t2", type_="decorated") - t3 = make_task("t3", type_="decorated") - with TaskGroup(group_id="tg1"): - t1 >> t2 >> t3 - dag.validate() - assert dag.task_group.children.keys() == {"tg1"} - assert dag.task_group.children["tg1"].children.keys() == {"t1", "t2", "t3"} - assert dag.task_group.children["tg1"].children["t1"].upstream_task_ids == set() - assert dag.task_group.children["tg1"].children["t1"].downstream_task_ids == {"t2"} - assert dag.task_group.children["tg1"].children["t2"].upstream_task_ids == {"t1"} - assert dag.task_group.children["tg1"].children["t2"].downstream_task_ids == {"t3"} - assert dag.task_group.children["tg1"].children["t3"].upstream_task_ids == {"t2"} - assert dag.task_group.children["tg1"].children["t3"].downstream_task_ids == set() - - # Test adding single decorated task defined outside a task group to a task group - with dag_maker() as dag: - t1 = make_task("t1", type_="decorated") - with TaskGroup(group_id="tg1") as tg1: - tg1.add_task(t1) - dag.validate() - assert dag.task_group.children.keys() == {"tg1"} - assert dag.task_group.children["tg1"].children.keys() == {"t1"} - assert dag.task_group.children["tg1"].children["t1"].upstream_task_ids == set() - assert dag.task_group.children["tg1"].children["t1"].downstream_task_ids == set() - - # Test adding single classic task defined outside a task group to a task group - with dag_maker() as dag: - t1 = make_task("t1") - with TaskGroup(group_id="tg1") as tg1: - tg1.add_task(t1) - dag.validate() - assert dag.task_group.children.keys() == {"tg1"} - assert dag.task_group.children["tg1"].children.keys() == {"t1"} - assert dag.task_group.children["tg1"].children["t1"].upstream_task_ids == set() - assert dag.task_group.children["tg1"].children["t1"].downstream_task_ids == set() - - with pytest.raises( - AirflowException, - match="Using this method on a task group that's not a context manager is not supported.", - ): - with dag_maker(): - t1 = make_task("t1") - tg1 = TaskGroup(group_id="tg1") - tg1.add_task(t1)
