This is an automated email from the ASF dual-hosted git repository.

husseinawala pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new ead38c3a8d Add tests for shift operators in DependencyMixin 2 (#37267)
ead38c3a8d is described below

commit ead38c3a8df56390c43d85c518b49e583fa65464
Author: raphaelauv <[email protected]>
AuthorDate: Sun Feb 11 21:03:54 2024 +0100

    Add tests for shift operators in DependencyMixin 2 (#37267)
    
    * Add tests for shift operators in DependencyMixin 2
    
    * review 1
    
    ---------
    
    Co-authored-by: raphaelauv <[email protected]>
---
 tests/models/test_taskmixin.py | 87 ++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 87 insertions(+)

diff --git a/tests/models/test_taskmixin.py b/tests/models/test_taskmixin.py
index 0e09a05c08..070483cfef 100644
--- a/tests/models/test_taskmixin.py
+++ b/tests/models/test_taskmixin.py
@@ -219,6 +219,32 @@ def 
test_cannot_set_on_failure_fail_dagrun_unless_teardown_taskflow(dag_maker):
 
 
 class TestDependencyMixin:
+    def test_set_upstream(self, dag_maker):
+        with dag_maker("test_set_upstream"):
+            op_a = EmptyOperator(task_id="a")
+            op_b = EmptyOperator(task_id="b")
+            op_c = EmptyOperator(task_id="c")
+            op_d = EmptyOperator(task_id="d")
+
+            op_d << op_c << op_b << op_a
+
+        assert [op_a] == op_b.upstream_list
+        assert [op_b] == op_c.upstream_list
+        assert [op_c] == op_d.upstream_list
+
+    def test_set_downstream(self, dag_maker):
+        with dag_maker("test_set_downstream"):
+            op_a = EmptyOperator(task_id="a")
+            op_b = EmptyOperator(task_id="b")
+            op_c = EmptyOperator(task_id="c")
+            op_d = EmptyOperator(task_id="d")
+
+            op_a >> op_b >> op_c >> op_d
+
+        assert [op_a] == op_b.upstream_list
+        assert [op_b] == op_c.upstream_list
+        assert [op_c] == op_d.upstream_list
+
     def test_set_upstream_list(self, dag_maker):
         with dag_maker("test_set_upstream_list"):
             op_a = EmptyOperator(task_id="a")
@@ -244,3 +270,64 @@ class TestDependencyMixin:
         assert [] == op_b.upstream_list
         assert [op_a] == op_d.upstream_list
         assert {op_a, op_b} == set(op_c.upstream_list)
+
+    def test_set_upstream_inner_list(self, dag_maker):
+        with dag_maker("test_set_upstream_inner_list"):
+            op_a = EmptyOperator(task_id="a")
+            op_b = EmptyOperator(task_id="b")
+            op_c = EmptyOperator(task_id="c")
+            op_d = EmptyOperator(task_id="d")
+        with pytest.raises(AttributeError) as e_info:
+            [op_d << [op_c, op_b]] << op_a
+
+        assert str(e_info.value) == "'list' object has no attribute 
'update_relative'"
+
+        assert [] == op_b.upstream_list
+        assert [] == op_c.upstream_list
+        assert {op_b, op_c} == set(op_d.upstream_list)
+
+    def test_set_downstream_inner_list(self, dag_maker):
+        with dag_maker("test_set_downstream_inner_list"):
+            op_a = EmptyOperator(task_id="a")
+            op_b = EmptyOperator(task_id="b")
+            op_c = EmptyOperator(task_id="c")
+            op_d = EmptyOperator(task_id="d")
+
+            op_a >> [[op_b, op_c] >> op_d]
+
+        assert [] == op_b.upstream_list
+        assert [] == op_c.upstream_list
+        assert {op_b, op_c, op_a} == set(op_d.upstream_list)
+
+    def test_set_upstream_list_subarray(self, dag_maker):
+        with dag_maker("test_set_upstream_list"):
+            op_a = EmptyOperator(task_id="a")
+            op_b_1 = EmptyOperator(task_id="b_1")
+            op_b_2 = EmptyOperator(task_id="b_2")
+            op_c = EmptyOperator(task_id="c")
+            op_d = EmptyOperator(task_id="d")
+
+        with pytest.raises(AttributeError) as e_info:
+            [op_d, op_c << [op_b_1, op_b_2]] << op_a
+
+        assert str(e_info.value) == "'list' object has no attribute 
'update_relative'"
+
+        assert [] == op_b_1.upstream_list
+        assert [] == op_b_2.upstream_list
+        assert [] == op_d.upstream_list
+        assert {op_b_1, op_b_2} == set(op_c.upstream_list)
+
+    def test_set_downstream_list_subarray(self, dag_maker):
+        with dag_maker("test_set_downstream_list"):
+            op_a = EmptyOperator(task_id="a")
+            op_b_1 = EmptyOperator(task_id="b_1")
+            op_b_2 = EmptyOperator(task_id="b2")
+            op_c = EmptyOperator(task_id="c")
+            op_d = EmptyOperator(task_id="d")
+
+            op_a >> [[op_b_1, op_b_2] >> op_c, op_d]
+
+        assert [] == op_b_1.upstream_list
+        assert [] == op_b_2.upstream_list
+        assert [op_a] == op_d.upstream_list
+        assert {op_a, op_b_1, op_b_2} == set(op_c.upstream_list)

Reply via email to