This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new c18a5a9f86 Don't auto-add to context just by virtue of arrowing 
(#33102)
c18a5a9f86 is described below

commit c18a5a9f8630d7b9ec464eb30d61f084d01ffcec
Author: Daniel Standish <[email protected]>
AuthorDate: Fri Aug 4 03:26:28 2023 -0700

    Don't auto-add to context just by virtue of arrowing (#33102)
    
    * Don't auto-add to context just by virtue of arrowing
    
    * no add tasks
    
    * Add back removed import
    
    * Fixup tests
    
    * fixup! Fixup tests
    
    ---------
    
    Co-authored-by: Ephraim Anierobi <[email protected]>
---
 airflow/models/taskmixin.py             | 21 -------
 tests/decorators/test_setup_teardown.py | 53 ------------------
 tests/models/test_taskmixin.py          | 97 ---------------------------------
 tests/utils/test_task_group.py          | 71 +-----------------------
 4 files changed, 1 insertion(+), 241 deletions(-)

diff --git a/airflow/models/taskmixin.py b/airflow/models/taskmixin.py
index 0dd013a579..f52749c7ff 100644
--- a/airflow/models/taskmixin.py
+++ b/airflow/models/taskmixin.py
@@ -24,7 +24,6 @@ import pendulum
 
 from airflow.exceptions import AirflowException, RemovedInAirflow3Warning
 from airflow.serialization.enums import DagAttributeTypes
-from airflow.utils.setup_teardown import SetupTeardownContext
 from airflow.utils.types import NOTSET, ArgNotSet
 
 if TYPE_CHECKING:
@@ -97,15 +96,11 @@ class DependencyMixin:
     def __lshift__(self, other: DependencyMixin | Sequence[DependencyMixin]):
         """Implements Task << Task."""
         self.set_upstream(other)
-        self.set_setup_teardown_ctx_dependencies(other)
-        self.set_taskgroup_ctx_dependencies(other)
         return other
 
     def __rshift__(self, other: DependencyMixin | Sequence[DependencyMixin]):
         """Implements Task >> Task."""
         self.set_downstream(other)
-        self.set_setup_teardown_ctx_dependencies(other)
-        self.set_taskgroup_ctx_dependencies(other)
         return other
 
     def __rrshift__(self, other: DependencyMixin | Sequence[DependencyMixin]):
@@ -136,22 +131,6 @@ class DependencyMixin:
             for o in obj:
                 yield from cls._iter_references(o)
 
-    def set_setup_teardown_ctx_dependencies(self, other: DependencyMixin | 
Sequence[DependencyMixin]):
-        if not SetupTeardownContext.active:
-            return
-        for op, _ in self._iter_references([self, other]):
-            SetupTeardownContext.update_context_map(op)
-
-    def set_taskgroup_ctx_dependencies(self, other: DependencyMixin | 
Sequence[DependencyMixin]):
-        from airflow.utils.task_group import TaskGroupContext
-
-        if not TaskGroupContext.active:
-            return
-        task_group = TaskGroupContext.get_current_task_group(None)
-        for op, _ in self._iter_references([self, other]):
-            if task_group:
-                op.add_to_taskgroup(task_group)
-
 
 class TaskMixin(DependencyMixin):
     """Mixin to provide task-related things.
diff --git a/tests/decorators/test_setup_teardown.py 
b/tests/decorators/test_setup_teardown.py
index bcfb81b0ba..567ac5c9ab 100644
--- a/tests/decorators/test_setup_teardown.py
+++ b/tests/decorators/test_setup_teardown.py
@@ -1130,59 +1130,6 @@ class TestSetupTearDownTask:
             "mytask",
         }
 
-    def test_tasks_decorators_called_outside_context_manager_can_link_up(self, 
dag_maker):
-        @setup
-        def setuptask():
-            print("setup")
-
-        @task()
-        def mytask():
-            print("mytask")
-
-        @task()
-        def mytask2():
-            print("mytask 2")
-
-        @teardown
-        def teardowntask():
-            print("teardown")
-
-        with dag_maker() as dag:
-            task1 = mytask()
-            task2 = mytask2()
-            with setuptask() >> teardowntask():
-                task1 >> task2
-
-        assert len(dag.task_group.children) == 4
-        assert not dag.task_group.children["setuptask"].upstream_task_ids
-        assert dag.task_group.children["setuptask"].downstream_task_ids == 
{"mytask", "teardowntask"}
-        assert dag.task_group.children["mytask"].upstream_task_ids == 
{"setuptask"}
-        assert dag.task_group.children["mytask"].downstream_task_ids == 
{"mytask2"}
-        assert dag.task_group.children["mytask2"].upstream_task_ids == 
{"mytask"}
-        assert dag.task_group.children["mytask2"].downstream_task_ids == 
{"teardowntask"}
-        assert dag.task_group.children["teardowntask"].upstream_task_ids == 
{"mytask2", "setuptask"}
-        assert not dag.task_group.children["teardowntask"].downstream_task_ids
-
-    def test_classic_tasks_called_outside_context_manager_can_link_up(self, 
dag_maker):
-
-        with dag_maker() as dag:
-            setuptask = BashOperator(task_id="setuptask", bash_command="echo 
1").as_setup()
-            teardowntask = BashOperator(task_id="teardowntask", 
bash_command="echo 1").as_teardown()
-            mytask = BashOperator(task_id="mytask", bash_command="echo 1")
-            mytask2 = BashOperator(task_id="mytask2", bash_command="echo 1")
-            with setuptask >> teardowntask:
-                mytask >> mytask2
-
-        assert len(dag.task_group.children) == 4
-        assert not dag.task_group.children["setuptask"].upstream_task_ids
-        assert dag.task_group.children["setuptask"].downstream_task_ids == 
{"mytask", "teardowntask"}
-        assert dag.task_group.children["mytask"].upstream_task_ids == 
{"setuptask"}
-        assert dag.task_group.children["mytask"].downstream_task_ids == 
{"mytask2"}
-        assert dag.task_group.children["mytask2"].upstream_task_ids == 
{"mytask"}
-        assert dag.task_group.children["mytask2"].downstream_task_ids == 
{"teardowntask"}
-        assert dag.task_group.children["teardowntask"].upstream_task_ids == 
{"mytask2", "setuptask"}
-        assert not dag.task_group.children["teardowntask"].downstream_task_ids
-
     def 
test_tasks_decorators_called_outside_context_manager_can_link_up_with_scope(self,
 dag_maker):
         @setup
         def setuptask():
diff --git a/tests/models/test_taskmixin.py b/tests/models/test_taskmixin.py
index c1795f22a8..95aefd0faa 100644
--- a/tests/models/test_taskmixin.py
+++ b/tests/models/test_taskmixin.py
@@ -209,100 +209,3 @@ def 
test_cannot_set_on_failure_fail_dagrun_unless_teardown_taskflow(dag_maker):
             ValueError, match="Cannot mark task 'my_ok_task__2' as setup; task 
is already a teardown."
         ):
             m.operator.is_setup = True
-
-
-def test_set_setup_teardown_ctx_dependencies_using_decorated_tasks(dag_maker):
-
-    with dag_maker():
-        t1 = make_task("t1", type_="decorated")
-        setuptask = make_task("setuptask", type_="decorated", setup_=True)
-        teardowntask = make_task("teardowntask", type_="decorated", 
teardown_=True)
-        with setuptask >> teardowntask as scope:
-            scope.add_task(t1)
-
-    assert t1.operator.upstream_task_ids == {"setuptask"}
-    assert t1.operator.downstream_task_ids == {"teardowntask"}
-
-    with dag_maker():
-        t1 = make_task("t1", type_="decorated")
-        t2 = make_task("t2", type_="decorated")
-        setuptask = make_task("setuptask", type_="decorated", setup_=True)
-        teardowntask = make_task("teardowntask", type_="decorated", 
teardown_=True)
-        with setuptask >> teardowntask:
-            t1 >> t2
-    assert t1.operator.upstream_task_ids == {"setuptask"}
-    assert t2.operator.downstream_task_ids == {"teardowntask"}
-
-    with dag_maker():
-        t1 = make_task("t1", type_="decorated")
-        t2 = make_task("t2", type_="decorated")
-        t3 = make_task("t3", type_="decorated")
-        setuptask = make_task("setuptask", type_="decorated", setup_=True)
-        teardowntask = make_task("teardowntask", type_="decorated", 
teardown_=True)
-        with setuptask >> teardowntask:
-            t1 >> [t2, t3]
-
-    assert t1.operator.upstream_task_ids == {"setuptask"}
-    assert t2.operator.downstream_task_ids == {"teardowntask"}
-    assert t3.operator.downstream_task_ids == {"teardowntask"}
-
-    with dag_maker():
-        t1 = make_task("t1", type_="decorated")
-        t2 = make_task("t2", type_="decorated")
-        t3 = make_task("t3", type_="decorated")
-        setuptask = make_task("setuptask", type_="decorated", setup_=True)
-        teardowntask = make_task("teardowntask", type_="decorated", 
teardown_=True)
-        with setuptask >> teardowntask:
-            [t1, t2] >> t3
-
-    assert t1.operator.upstream_task_ids == {"setuptask"}
-    assert t2.operator.upstream_task_ids == {"setuptask"}
-    assert t3.operator.downstream_task_ids == {"teardowntask"}
-
-
-def test_set_setup_teardown_ctx_dependencies_using_classic_tasks(dag_maker):
-    with dag_maker():
-        t1 = make_task("t1", type_="classic")
-        setuptask = make_task("setuptask", type_="classic", setup_=True)
-        teardowntask = make_task("teardowntask", type_="classic", 
teardown_=True)
-        with setuptask >> teardowntask as scope:
-            scope.add_task(t1)
-
-    assert t1.upstream_task_ids == {"setuptask"}
-    assert t1.downstream_task_ids == {"teardowntask"}
-
-    with dag_maker():
-        t1 = make_task("t1", type_="classic")
-        t2 = make_task("t2", type_="classic")
-        setuptask = make_task("setuptask", type_="classic", setup_=True)
-        teardowntask = make_task("teardowntask", type_="classic", 
teardown_=True)
-        with setuptask >> teardowntask:
-            t1 >> t2
-    assert t1.upstream_task_ids == {"setuptask"}
-    assert t2.downstream_task_ids == {"teardowntask"}
-
-    with dag_maker():
-        t1 = make_task("t1", type_="classic")
-        t2 = make_task("t2", type_="classic")
-        t3 = make_task("t3", type_="classic")
-        setuptask = make_task("setuptask", type_="classic", setup_=True)
-        teardowntask = make_task("teardowntask", type_="classic", 
teardown_=True)
-        with setuptask >> teardowntask:
-            t1 >> [t2, t3]
-
-    assert t1.upstream_task_ids == {"setuptask"}
-    assert t2.downstream_task_ids == {"teardowntask"}
-    assert t3.downstream_task_ids == {"teardowntask"}
-
-    with dag_maker():
-        t1 = make_task("t1", type_="classic")
-        t2 = make_task("t2", type_="classic")
-        t3 = make_task("t3", type_="classic")
-        setuptask = make_task("setuptask", type_="classic", setup_=True)
-        teardowntask = make_task("teardowntask", type_="classic", 
teardown_=True)
-        with setuptask >> teardowntask:
-            [t1, t2] >> t3
-
-    assert t1.upstream_task_ids == {"setuptask"}
-    assert t2.upstream_task_ids == {"setuptask"}
-    assert t3.downstream_task_ids == {"teardowntask"}
diff --git a/tests/utils/test_task_group.py b/tests/utils/test_task_group.py
index 4475e7141a..c021d98b88 100644
--- a/tests/utils/test_task_group.py
+++ b/tests/utils/test_task_group.py
@@ -23,7 +23,7 @@ import pendulum
 import pytest
 
 from airflow.decorators import dag, task as task_decorator, task_group as 
task_group_decorator
-from airflow.exceptions import AirflowException, TaskAlreadyInTaskGroup
+from airflow.exceptions import TaskAlreadyInTaskGroup
 from airflow.models.baseoperator import BaseOperator
 from airflow.models.dag import DAG
 from airflow.models.xcom_arg import XComArg
@@ -1479,72 +1479,3 @@ def test_task_group_arrow_with_setups_teardowns():
         tg1 >> w2
     assert t1.downstream_task_ids == set()
     assert w1.downstream_task_ids == {"tg1.t1", "w2"}
-
-
-def test_tasks_defined_outside_taskgrooup(dag_maker):
-    # Test that classic tasks defined outside a task group are added to the 
root task group
-    # when the relationships are defined inside the task group
-    with dag_maker() as dag:
-        t1 = make_task("t1")
-        t2 = make_task("t2")
-        t3 = make_task("t3")
-        with TaskGroup(group_id="tg1"):
-            t1 >> t2 >> t3
-    dag.validate()
-    assert dag.task_group.children.keys() == {"tg1"}
-    assert dag.task_group.children["tg1"].children.keys() == {"t1", "t2", "t3"}
-    assert dag.task_group.children["tg1"].children["t1"].upstream_task_ids == 
set()
-    assert dag.task_group.children["tg1"].children["t1"].downstream_task_ids 
== {"t2"}
-    assert dag.task_group.children["tg1"].children["t2"].upstream_task_ids == 
{"t1"}
-    assert dag.task_group.children["tg1"].children["t2"].downstream_task_ids 
== {"t3"}
-    assert dag.task_group.children["tg1"].children["t3"].upstream_task_ids == 
{"t2"}
-    assert dag.task_group.children["tg1"].children["t3"].downstream_task_ids 
== set()
-
-    # Test that decorated tasks defined outside a task group are added to the 
root task group
-    # when relationships are defined inside the task group
-    with dag_maker() as dag:
-        t1 = make_task("t1", type_="decorated")
-        t2 = make_task("t2", type_="decorated")
-        t3 = make_task("t3", type_="decorated")
-        with TaskGroup(group_id="tg1"):
-            t1 >> t2 >> t3
-    dag.validate()
-    assert dag.task_group.children.keys() == {"tg1"}
-    assert dag.task_group.children["tg1"].children.keys() == {"t1", "t2", "t3"}
-    assert dag.task_group.children["tg1"].children["t1"].upstream_task_ids == 
set()
-    assert dag.task_group.children["tg1"].children["t1"].downstream_task_ids 
== {"t2"}
-    assert dag.task_group.children["tg1"].children["t2"].upstream_task_ids == 
{"t1"}
-    assert dag.task_group.children["tg1"].children["t2"].downstream_task_ids 
== {"t3"}
-    assert dag.task_group.children["tg1"].children["t3"].upstream_task_ids == 
{"t2"}
-    assert dag.task_group.children["tg1"].children["t3"].downstream_task_ids 
== set()
-
-    # Test adding single decorated task defined outside a task group to a task 
group
-    with dag_maker() as dag:
-        t1 = make_task("t1", type_="decorated")
-        with TaskGroup(group_id="tg1") as tg1:
-            tg1.add_task(t1)
-    dag.validate()
-    assert dag.task_group.children.keys() == {"tg1"}
-    assert dag.task_group.children["tg1"].children.keys() == {"t1"}
-    assert dag.task_group.children["tg1"].children["t1"].upstream_task_ids == 
set()
-    assert dag.task_group.children["tg1"].children["t1"].downstream_task_ids 
== set()
-
-    # Test adding single classic task defined outside a task group to a task 
group
-    with dag_maker() as dag:
-        t1 = make_task("t1")
-        with TaskGroup(group_id="tg1") as tg1:
-            tg1.add_task(t1)
-    dag.validate()
-    assert dag.task_group.children.keys() == {"tg1"}
-    assert dag.task_group.children["tg1"].children.keys() == {"t1"}
-    assert dag.task_group.children["tg1"].children["t1"].upstream_task_ids == 
set()
-    assert dag.task_group.children["tg1"].children["t1"].downstream_task_ids 
== set()
-
-    with pytest.raises(
-        AirflowException,
-        match="Using this method on a task group that's not a context manager 
is not supported.",
-    ):
-        with dag_maker():
-            t1 = make_task("t1")
-            tg1 = TaskGroup(group_id="tg1")
-            tg1.add_task(t1)

Reply via email to