uranusjr commented on code in PR #33097:
URL: https://github.com/apache/airflow/pull/33097#discussion_r1284051665
##########
tests/utils/test_task_group.py:
##########
@@ -1548,3 +1554,128 @@ def test_tasks_defined_outside_taskgrooup(dag_maker):
t1 = make_task("t1")
tg1 = TaskGroup(group_id="tg1")
tg1.add_task(t1)
+
+
+def test_task_group_arrow_with_setup_group():
+ with DAG(dag_id="setup_group_teardown_group", start_date=pendulum.now()):
+ with TaskGroup("group_1") as g1:
+
+ @setup
+ def setup_1():
+ ...
+
+ @setup
+ def setup_2():
+ ...
+
+ s1 = setup_1()
+ s2 = setup_2()
+
+ with TaskGroup("group_2") as g2:
+
+ @teardown
+ def teardown_1():
+ ...
+
+ @teardown
+ def teardown_2():
+ ...
+
+ t1 = teardown_1()
+ t2 = teardown_2()
+
+ @task_decorator
+ def work():
+ ...
+
+ w1 = work()
+ g1 >> w1 >> g2
+ t1.as_teardown(setups=s1)
+ t2.as_teardown(setups=s2)
+ assert set(s1.operator.downstream_task_ids) == {"work",
"group_2.teardown_1"}
+ assert set(s2.operator.downstream_task_ids) == {"work",
"group_2.teardown_2"}
+ assert set(w1.operator.downstream_task_ids) == {"group_2.teardown_1",
"group_2.teardown_2"}
+ assert set(t1.operator.downstream_task_ids) == set()
+ assert set(t2.operator.downstream_task_ids) == set()
+
+ def get_nodes(group):
+ d = task_group_to_dict(group)
+ new_d = {}
+ new_d["id"] = d["id"]
+ new_d["children"] = [{"id": x["id"]} for x in d["children"]]
+ return new_d
+
+ assert get_nodes(g1) == {
+ "id": "group_1",
+ "children": [
+ {"id": "group_1.setup_1"},
+ {"id": "group_1.setup_2"},
+ {"id": "group_1.downstream_join_id"},
+ ],
+ }
+
+
+def test_task_group_arrow_with_setup_group_deeper_setup():
+ with DAG(dag_id="setup_group_teardown_group_2", start_date=pendulum.now()):
+ with TaskGroup("group_1") as g1:
+
+ @setup
+ def setup_1():
+ ...
+
+ @setup
+ def setup_2():
+ ...
+
+ @teardown
+ def teardown_0():
+ ...
+
+ s1 = setup_1()
+ s2 = setup_2()
+ t0 = teardown_0()
+ s2 >> t0
+
+ with TaskGroup("group_2") as g2:
+
+ @teardown
+ def teardown_1():
+ ...
+
+ @teardown
+ def teardown_2():
+ ...
+
+ t1 = teardown_1()
+ t2 = teardown_2()
+
+ @task_decorator
+ def work():
+ ...
+
+ w1 = work()
+ g1 >> w1 >> g2
+ t1.as_teardown(setups=s1)
+ t2.as_teardown(setups=s2)
+ assert set(s1.operator.downstream_task_ids) == {"work",
"group_2.teardown_1"}
+ assert set(s2.operator.downstream_task_ids) == {"work",
"group_1.teardown_0", "group_2.teardown_2"}
+ assert set(w1.operator.downstream_task_ids) == {"group_2.teardown_1",
"group_2.teardown_2"}
+ assert set(t1.operator.downstream_task_ids) == set()
+ assert set(t2.operator.downstream_task_ids) == set()
Review Comment:
Aren’t these sets to begin with?
```suggestion
assert s1.operator.downstream_task_ids == {"work", "group_2.teardown_1"}
assert s2.operator.downstream_task_ids == {"work", "group_1.teardown_0",
"group_2.teardown_2"}
assert w1.operator.downstream_task_ids == {"group_2.teardown_1",
"group_2.teardown_2"}
assert t1.operator.downstream_task_ids == set()
assert t2.operator.downstream_task_ids == set()
```
##########
tests/utils/test_task_group.py:
##########
@@ -1548,3 +1554,128 @@ def test_tasks_defined_outside_taskgrooup(dag_maker):
t1 = make_task("t1")
tg1 = TaskGroup(group_id="tg1")
tg1.add_task(t1)
+
+
+def test_task_group_arrow_with_setup_group():
+ with DAG(dag_id="setup_group_teardown_group", start_date=pendulum.now()):
+ with TaskGroup("group_1") as g1:
+
+ @setup
+ def setup_1():
+ ...
+
+ @setup
+ def setup_2():
+ ...
+
+ s1 = setup_1()
+ s2 = setup_2()
+
+ with TaskGroup("group_2") as g2:
+
+ @teardown
+ def teardown_1():
+ ...
+
+ @teardown
+ def teardown_2():
+ ...
+
+ t1 = teardown_1()
+ t2 = teardown_2()
+
+ @task_decorator
+ def work():
+ ...
+
+ w1 = work()
+ g1 >> w1 >> g2
+ t1.as_teardown(setups=s1)
+ t2.as_teardown(setups=s2)
+ assert set(s1.operator.downstream_task_ids) == {"work",
"group_2.teardown_1"}
+ assert set(s2.operator.downstream_task_ids) == {"work",
"group_2.teardown_2"}
+ assert set(w1.operator.downstream_task_ids) == {"group_2.teardown_1",
"group_2.teardown_2"}
+ assert set(t1.operator.downstream_task_ids) == set()
+ assert set(t2.operator.downstream_task_ids) == set()
+
+ def get_nodes(group):
+ d = task_group_to_dict(group)
+ new_d = {}
+ new_d["id"] = d["id"]
+ new_d["children"] = [{"id": x["id"]} for x in d["children"]]
+ return new_d
+
+ assert get_nodes(g1) == {
+ "id": "group_1",
+ "children": [
+ {"id": "group_1.setup_1"},
+ {"id": "group_1.setup_2"},
+ {"id": "group_1.downstream_join_id"},
+ ],
+ }
+
+
+def test_task_group_arrow_with_setup_group_deeper_setup():
+ with DAG(dag_id="setup_group_teardown_group_2", start_date=pendulum.now()):
+ with TaskGroup("group_1") as g1:
+
+ @setup
+ def setup_1():
+ ...
+
+ @setup
+ def setup_2():
+ ...
+
+ @teardown
+ def teardown_0():
+ ...
+
+ s1 = setup_1()
+ s2 = setup_2()
+ t0 = teardown_0()
+ s2 >> t0
+
+ with TaskGroup("group_2") as g2:
+
+ @teardown
+ def teardown_1():
+ ...
+
+ @teardown
+ def teardown_2():
+ ...
+
+ t1 = teardown_1()
+ t2 = teardown_2()
+
+ @task_decorator
+ def work():
+ ...
+
+ w1 = work()
+ g1 >> w1 >> g2
+ t1.as_teardown(setups=s1)
+ t2.as_teardown(setups=s2)
+ assert set(s1.operator.downstream_task_ids) == {"work",
"group_2.teardown_1"}
+ assert set(s2.operator.downstream_task_ids) == {"work",
"group_1.teardown_0", "group_2.teardown_2"}
+ assert set(w1.operator.downstream_task_ids) == {"group_2.teardown_1",
"group_2.teardown_2"}
+ assert set(t1.operator.downstream_task_ids) == set()
+ assert set(t2.operator.downstream_task_ids) == set()
Review Comment:
Aren’t these sets to begin with?
```suggestion
assert s1.operator.downstream_task_ids == {"work", "group_2.teardown_1"}
assert s2.operator.downstream_task_ids == {"work", "group_1.teardown_0",
"group_2.teardown_2"}
assert w1.operator.downstream_task_ids == {"group_2.teardown_1",
"group_2.teardown_2"}
assert t1.operator.downstream_task_ids == set()
assert t2.operator.downstream_task_ids == set()
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]