This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new c3f53b1d598 Fix topological sort for Grid View (#56963)
c3f53b1d598 is described below
commit c3f53b1d598a55df42ba588fbd1dd10fab2f2ae8
Author: Pierre Jeambrun <[email protected]>
AuthorDate: Tue Oct 21 19:27:18 2025 +0200
Fix topological sort for Grid View (#56963)
Fixes: https://github.com/apache/airflow/issues/55899
Closes https://github.com/apache/airflow/pull/56321
Similarly to
https://github.com/apache/airflow/blob/main/task-sdk/src/airflow/sdk/definitions/taskgroup.py#L561,
we need an exit condition if the taskgroup is found in usorted graph.
Adjusted test, which indeed were not in the correct topological order.
Testing dag code:
```python
from __future__ import annotations
import datetime
import pendulum
from airflow.sdk import dag, task, task_group
@task
def get_nums() -> list[int]:
return [1, 2, 4]
@task
def times_2(n: int) -> int:
return n * 2
@task_group(group_id="process_number")
def process_number(n: int):
value = times_2(n)
return value
@task
def log_success() -> None:
print("Processed successful!")
@dag(
schedule=None,
catchup=False,
start_date=pendulum.datetime(2025, 4, 1, tz="Europe/Copenhagen"),
dagrun_timeout=datetime.timedelta(minutes=30),
dag_id="55899_bug",
)
def test():
nums = get_nums()
processed = process_number.expand(n=nums)
processed >> log_success()
test()
```
### Before
<img width="1917" height="1016" alt="Screenshot 2025-10-21 at 17 57 20"
src="https://github.com/user-attachments/assets/d5220b87-a23b-40f7-8ecf-cb1b39d72f53"
/>
### After
<img width="1923" height="937" alt="Screenshot 2025-10-21 at 17 56 57"
src="https://github.com/user-attachments/assets/37f75b19-2e80-4765-b9cb-e425f9054b78"
/>
---
.../airflow/serialization/definitions/taskgroup.py | 4 +++
airflow-core/tests/unit/utils/test_task_group.py | 34 +++++++++++-----------
2 files changed, 21 insertions(+), 17 deletions(-)
diff --git a/airflow-core/src/airflow/serialization/definitions/taskgroup.py
b/airflow-core/src/airflow/serialization/definitions/taskgroup.py
index 4df819c97ec..6c0add8cdfb 100644
--- a/airflow-core/src/airflow/serialization/definitions/taskgroup.py
+++ b/airflow-core/src/airflow/serialization/definitions/taskgroup.py
@@ -238,6 +238,10 @@ class SerializedTaskGroup(DAGNode):
if tg.node_id in graph_unsorted:
break
tg = tg.parent_group
+
+ if tg:
+ # We are already going to visit that TG
+ break
else:
del graph_unsorted[node.node_id]
graph_sorted.append(node)
diff --git a/airflow-core/tests/unit/utils/test_task_group.py
b/airflow-core/tests/unit/utils/test_task_group.py
index 52524ecda2f..67ba538b6cb 100644
--- a/airflow-core/tests/unit/utils/test_task_group.py
+++ b/airflow-core/tests/unit/utils/test_task_group.py
@@ -158,7 +158,6 @@ EXPECTED_JSON_LEGACY = {
EXPECTED_JSON = {
"children": [
{"id": "task1", "label": "task1", "operator": "EmptyOperator", "type":
"task"},
- {"id": "task5", "label": "task5", "operator": "EmptyOperator", "type":
"task"},
{
"children": [
{
@@ -197,6 +196,7 @@ EXPECTED_JSON = {
"tooltip": "",
"type": "task",
},
+ {"id": "task5", "label": "task5", "operator": "EmptyOperator", "type":
"task"},
],
"id": None,
"is_mapped": False,
@@ -277,7 +277,6 @@ def test_task_group_to_dict_with_prefix(dag_maker):
expected_node_id = {
"children": [
{"id": "task1", "label": "task1"},
- {"id": "task5", "label": "task5"},
{
"id": "group234",
"label": "group234",
@@ -299,6 +298,7 @@ def test_task_group_to_dict_with_prefix(dag_maker):
{"id": "group234.upstream_join_id", "label": ""},
],
},
+ {"id": "task5", "label": "task5"},
],
"id": None,
"label": "",
@@ -347,7 +347,6 @@ def test_task_group_to_dict_with_task_decorator(dag_maker):
"id": None,
"children": [
{"id": "task_1"},
- {"id": "task_5"},
{
"id": "group234",
"children": [
@@ -358,6 +357,7 @@ def test_task_group_to_dict_with_task_decorator(dag_maker):
{"id": "group234.downstream_join_id"},
],
},
+ {"id": "task_5"},
],
}
@@ -403,7 +403,6 @@ def test_task_group_to_dict_sub_dag(dag_maker):
"id": None,
"children": [
{"id": "task1"},
- {"id": "task5"},
{
"id": "group234",
"children": [
@@ -418,6 +417,7 @@ def test_task_group_to_dict_sub_dag(dag_maker):
{"id": "group234.upstream_join_id"},
],
},
+ {"id": "task5"},
],
}
@@ -478,16 +478,6 @@ def test_task_group_to_dict_and_dag_edges(dag_maker):
expected_node_id = {
"id": None,
"children": [
- {
- "id": "group_c",
- "children": [
- {"id": "group_c.task6"},
- {"id": "group_c.task7"},
- {"id": "group_c.task8"},
- {"id": "group_c.upstream_join_id"},
- {"id": "group_c.downstream_join_id"},
- ],
- },
{
"id": "group_d",
"children": [
@@ -497,8 +487,6 @@ def test_task_group_to_dict_and_dag_edges(dag_maker):
],
},
{"id": "task1"},
- {"id": "task10"},
- {"id": "task9"},
{
"id": "group_a",
"children": [
@@ -516,6 +504,18 @@ def test_task_group_to_dict_and_dag_edges(dag_maker):
{"id": "group_a.downstream_join_id"},
],
},
+ {
+ "id": "group_c",
+ "children": [
+ {"id": "group_c.task6"},
+ {"id": "group_c.task7"},
+ {"id": "group_c.task8"},
+ {"id": "group_c.upstream_join_id"},
+ {"id": "group_c.downstream_join_id"},
+ ],
+ },
+ {"id": "task10"},
+ {"id": "task9"},
],
}
@@ -784,7 +784,6 @@ def test_task_group_context_mix(dag_maker):
node_ids = {
"id": None,
"children": [
- {"id": "task_end"},
{"id": "task_start"},
{
"id": "section_1",
@@ -804,6 +803,7 @@ def test_task_group_context_mix(dag_maker):
{"id": "section_1.downstream_join_id"},
],
},
+ {"id": "task_end"},
],
}