This is an automated email from the ASF dual-hosted git repository.
husseinawala pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new ead38c3a8d Add tests for shift operators in DependencyMixin 2 (#37267)
ead38c3a8d is described below
commit ead38c3a8df56390c43d85c518b49e583fa65464
Author: raphaelauv <[email protected]>
AuthorDate: Sun Feb 11 21:03:54 2024 +0100
Add tests for shift operators in DependencyMixin 2 (#37267)
* Add tests for shift operators in DependencyMixin 2
* review 1
---------
Co-authored-by: raphaelauv <[email protected]>
---
tests/models/test_taskmixin.py | 87 ++++++++++++++++++++++++++++++++++++++++++
1 file changed, 87 insertions(+)
diff --git a/tests/models/test_taskmixin.py b/tests/models/test_taskmixin.py
index 0e09a05c08..070483cfef 100644
--- a/tests/models/test_taskmixin.py
+++ b/tests/models/test_taskmixin.py
@@ -219,6 +219,32 @@ def
test_cannot_set_on_failure_fail_dagrun_unless_teardown_taskflow(dag_maker):
class TestDependencyMixin:
+ def test_set_upstream(self, dag_maker):
+ with dag_maker("test_set_upstream"):
+ op_a = EmptyOperator(task_id="a")
+ op_b = EmptyOperator(task_id="b")
+ op_c = EmptyOperator(task_id="c")
+ op_d = EmptyOperator(task_id="d")
+
+ op_d << op_c << op_b << op_a
+
+ assert [op_a] == op_b.upstream_list
+ assert [op_b] == op_c.upstream_list
+ assert [op_c] == op_d.upstream_list
+
+ def test_set_downstream(self, dag_maker):
+ with dag_maker("test_set_downstream"):
+ op_a = EmptyOperator(task_id="a")
+ op_b = EmptyOperator(task_id="b")
+ op_c = EmptyOperator(task_id="c")
+ op_d = EmptyOperator(task_id="d")
+
+ op_a >> op_b >> op_c >> op_d
+
+ assert [op_a] == op_b.upstream_list
+ assert [op_b] == op_c.upstream_list
+ assert [op_c] == op_d.upstream_list
+
def test_set_upstream_list(self, dag_maker):
with dag_maker("test_set_upstream_list"):
op_a = EmptyOperator(task_id="a")
@@ -244,3 +270,64 @@ class TestDependencyMixin:
assert [] == op_b.upstream_list
assert [op_a] == op_d.upstream_list
assert {op_a, op_b} == set(op_c.upstream_list)
+
+ def test_set_upstream_inner_list(self, dag_maker):
+ with dag_maker("test_set_upstream_inner_list"):
+ op_a = EmptyOperator(task_id="a")
+ op_b = EmptyOperator(task_id="b")
+ op_c = EmptyOperator(task_id="c")
+ op_d = EmptyOperator(task_id="d")
+ with pytest.raises(AttributeError) as e_info:
+ [op_d << [op_c, op_b]] << op_a
+
+ assert str(e_info.value) == "'list' object has no attribute
'update_relative'"
+
+ assert [] == op_b.upstream_list
+ assert [] == op_c.upstream_list
+ assert {op_b, op_c} == set(op_d.upstream_list)
+
+ def test_set_downstream_inner_list(self, dag_maker):
+ with dag_maker("test_set_downstream_inner_list"):
+ op_a = EmptyOperator(task_id="a")
+ op_b = EmptyOperator(task_id="b")
+ op_c = EmptyOperator(task_id="c")
+ op_d = EmptyOperator(task_id="d")
+
+ op_a >> [[op_b, op_c] >> op_d]
+
+ assert [] == op_b.upstream_list
+ assert [] == op_c.upstream_list
+ assert {op_b, op_c, op_a} == set(op_d.upstream_list)
+
+ def test_set_upstream_list_subarray(self, dag_maker):
+ with dag_maker("test_set_upstream_list"):
+ op_a = EmptyOperator(task_id="a")
+ op_b_1 = EmptyOperator(task_id="b_1")
+ op_b_2 = EmptyOperator(task_id="b_2")
+ op_c = EmptyOperator(task_id="c")
+ op_d = EmptyOperator(task_id="d")
+
+ with pytest.raises(AttributeError) as e_info:
+ [op_d, op_c << [op_b_1, op_b_2]] << op_a
+
+ assert str(e_info.value) == "'list' object has no attribute
'update_relative'"
+
+ assert [] == op_b_1.upstream_list
+ assert [] == op_b_2.upstream_list
+ assert [] == op_d.upstream_list
+ assert {op_b_1, op_b_2} == set(op_c.upstream_list)
+
+ def test_set_downstream_list_subarray(self, dag_maker):
+ with dag_maker("test_set_downstream_list"):
+ op_a = EmptyOperator(task_id="a")
+ op_b_1 = EmptyOperator(task_id="b_1")
+ op_b_2 = EmptyOperator(task_id="b2")
+ op_c = EmptyOperator(task_id="c")
+ op_d = EmptyOperator(task_id="d")
+
+ op_a >> [[op_b_1, op_b_2] >> op_c, op_d]
+
+ assert [] == op_b_1.upstream_list
+ assert [] == op_b_2.upstream_list
+ assert [op_a] == op_d.upstream_list
+ assert {op_a, op_b_1, op_b_2} == set(op_c.upstream_list)