hussein-awala commented on code in PR #37267:
URL: https://github.com/apache/airflow/pull/37267#discussion_r1484512106
##########
tests/models/test_taskmixin.py:
##########
@@ -244,3 +270,67 @@ def test_set_downstream_list(self, dag_maker):
assert [] == op_b.upstream_list
assert [op_a] == op_d.upstream_list
assert {op_a, op_b} == set(op_c.upstream_list)
+
+ def test_set_upstream_inner_list(self, dag_maker):
+ with dag_maker("test_set_upstream_inner_list"):
+ op_a = EmptyOperator(task_id="a")
+ op_b = EmptyOperator(task_id="b")
+ op_c = EmptyOperator(task_id="c")
+ op_d = EmptyOperator(task_id="d")
+ with pytest.raises(AttributeError) as e_info:
+ [op_d << [op_c, op_b]] << op_a
+
+ assert str(e_info.value) == "'list' object has no attribute
'update_relative'"
+
+ assert [] == op_b.upstream_list
+ assert [] == op_c.upstream_list
+ assert {op_b, op_c} == set(op_d.upstream_list)
Review Comment:
not sure if this check is useful after the exception
##########
tests/models/test_taskmixin.py:
##########
@@ -244,3 +270,67 @@ def test_set_downstream_list(self, dag_maker):
assert [] == op_b.upstream_list
assert [op_a] == op_d.upstream_list
assert {op_a, op_b} == set(op_c.upstream_list)
+
+ def test_set_upstream_inner_list(self, dag_maker):
+ with dag_maker("test_set_upstream_inner_list"):
+ op_a = EmptyOperator(task_id="a")
+ op_b = EmptyOperator(task_id="b")
+ op_c = EmptyOperator(task_id="c")
+ op_d = EmptyOperator(task_id="d")
+ with pytest.raises(AttributeError) as e_info:
+ [op_d << [op_c, op_b]] << op_a
+
+ assert str(e_info.value) == "'list' object has no attribute
'update_relative'"
+
+ assert [] == op_b.upstream_list
+ assert [] == op_c.upstream_list
+ assert {op_b, op_c} == set(op_d.upstream_list)
+
+ def test_set_downstream_inner_list(self, dag_maker):
+ with dag_maker("test_set_downstream_inner_list"):
+ op_a = EmptyOperator(task_id="a")
+ op_b = EmptyOperator(task_id="b")
+ op_c = EmptyOperator(task_id="c")
+ op_d = EmptyOperator(task_id="d")
+
+ op_a >> [[op_b, op_c] >> op_d]
+
+ assert [] == op_b.upstream_list
+ assert [] == op_c.upstream_list
+ assert {op_b, op_c, op_a} == set(op_d.upstream_list)
+
+ def test_set_upstream_list_subarray(self, dag_maker):
+ with dag_maker("test_set_upstream_list"):
+ op_a = EmptyOperator(task_id="a")
+ op_b_1 = EmptyOperator(task_id="b_1")
+ op_b_2 = EmptyOperator(task_id="b_2")
+ op_c = EmptyOperator(task_id="c")
+ op_d = EmptyOperator(task_id="d")
+
+ with pytest.raises(AttributeError) as e_info:
+ [op_d, op_c << [op_b_1, op_b_2]] << op_a
+
+ assert str(e_info.value) == "'list' object has no attribute
'update_relative'"
+
+ assert [] == op_b_1.upstream_list
+ assert [] == op_b_2.upstream_list
+ assert [] == op_d.upstream_list
+ assert op_b_1 in op_c.upstream_list
+ assert op_b_2 in op_c.upstream_list
+
+ def test_set_downstream_list_subarray(self, dag_maker):
+ with dag_maker("test_set_downstream_list"):
+ op_a = EmptyOperator(task_id="a")
+ op_b_1 = EmptyOperator(task_id="b_1")
+ op_b_2 = EmptyOperator(task_id="b2")
+ op_c = EmptyOperator(task_id="c")
+ op_d = EmptyOperator(task_id="d")
+
+ op_a >> [[op_b_1, op_b_2] >> op_c, op_d]
+
+ assert [] == op_b_1.upstream_list
+ assert [] == op_b_2.upstream_list
+ assert [op_a] == op_d.upstream_list
+ assert op_a in op_c.upstream_list
+ assert op_b_1 in op_c.upstream_list
+ assert op_b_2 in op_c.upstream_list
Review Comment:
you can
```suggestion
assert {op_a, op_b_1, op_b_2} == set(op_c.upstream_list)
```
##########
tests/models/test_taskmixin.py:
##########
@@ -244,3 +270,67 @@ def test_set_downstream_list(self, dag_maker):
assert [] == op_b.upstream_list
assert [op_a] == op_d.upstream_list
assert {op_a, op_b} == set(op_c.upstream_list)
+
+ def test_set_upstream_inner_list(self, dag_maker):
+ with dag_maker("test_set_upstream_inner_list"):
+ op_a = EmptyOperator(task_id="a")
+ op_b = EmptyOperator(task_id="b")
+ op_c = EmptyOperator(task_id="c")
+ op_d = EmptyOperator(task_id="d")
+ with pytest.raises(AttributeError) as e_info:
+ [op_d << [op_c, op_b]] << op_a
+
+ assert str(e_info.value) == "'list' object has no attribute
'update_relative'"
+
+ assert [] == op_b.upstream_list
+ assert [] == op_c.upstream_list
+ assert {op_b, op_c} == set(op_d.upstream_list)
+
+ def test_set_downstream_inner_list(self, dag_maker):
+ with dag_maker("test_set_downstream_inner_list"):
+ op_a = EmptyOperator(task_id="a")
+ op_b = EmptyOperator(task_id="b")
+ op_c = EmptyOperator(task_id="c")
+ op_d = EmptyOperator(task_id="d")
+
+ op_a >> [[op_b, op_c] >> op_d]
+
+ assert [] == op_b.upstream_list
+ assert [] == op_c.upstream_list
+ assert {op_b, op_c, op_a} == set(op_d.upstream_list)
+
+ def test_set_upstream_list_subarray(self, dag_maker):
+ with dag_maker("test_set_upstream_list"):
+ op_a = EmptyOperator(task_id="a")
+ op_b_1 = EmptyOperator(task_id="b_1")
+ op_b_2 = EmptyOperator(task_id="b_2")
+ op_c = EmptyOperator(task_id="c")
+ op_d = EmptyOperator(task_id="d")
+
+ with pytest.raises(AttributeError) as e_info:
+ [op_d, op_c << [op_b_1, op_b_2]] << op_a
+
+ assert str(e_info.value) == "'list' object has no attribute
'update_relative'"
+
+ assert [] == op_b_1.upstream_list
+ assert [] == op_b_2.upstream_list
+ assert [] == op_d.upstream_list
+ assert op_b_1 in op_c.upstream_list
+ assert op_b_2 in op_c.upstream_list
Review Comment:
same here
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]