This is an automated email from the ASF dual-hosted git repository. kaxilnaik pushed a commit to branch v3-1-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit ccc33ffd109b64c6e41512d9cbaa38c53cabef7d Author: Pierre Jeambrun <[email protected]> AuthorDate: Tue Oct 21 19:27:18 2025 +0200 Fix topological sort for Grid View (#56963) Fixes: https://github.com/apache/airflow/issues/55899 Closes https://github.com/apache/airflow/pull/56321 Similarly to https://github.com/apache/airflow/blob/main/task-sdk/src/airflow/sdk/definitions/taskgroup.py#L561, we need an exit condition if the taskgroup is found in usorted graph. Adjusted test, which indeed were not in the correct topological order. Testing dag code: ```python from __future__ import annotations import datetime import pendulum from airflow.sdk import dag, task, task_group @task def get_nums() -> list[int]: return [1, 2, 4] @task def times_2(n: int) -> int: return n * 2 @task_group(group_id="process_number") def process_number(n: int): value = times_2(n) return value @task def log_success() -> None: print("Processed successful!") @dag( schedule=None, catchup=False, start_date=pendulum.datetime(2025, 4, 1, tz="Europe/Copenhagen"), dagrun_timeout=datetime.timedelta(minutes=30), dag_id="55899_bug", ) def test(): nums = get_nums() processed = process_number.expand(n=nums) processed >> log_success() test() ``` ### Before <img width="1917" height="1016" alt="Screenshot 2025-10-21 at 17 57 20" src="https://github.com/user-attachments/assets/d5220b87-a23b-40f7-8ecf-cb1b39d72f53" /> ### After <img width="1923" height="937" alt="Screenshot 2025-10-21 at 17 56 57" src="https://github.com/user-attachments/assets/37f75b19-2e80-4765-b9cb-e425f9054b78" /> (cherry picked from commit c3f53b1d598a55df42ba588fbd1dd10fab2f2ae8) --- .../airflow/serialization/definitions/taskgroup.py | 4 +++ airflow-core/tests/unit/utils/test_task_group.py | 34 +++++++++++----------- 2 files changed, 21 insertions(+), 17 deletions(-) diff --git a/airflow-core/src/airflow/serialization/definitions/taskgroup.py b/airflow-core/src/airflow/serialization/definitions/taskgroup.py index 4df819c97ec..6c0add8cdfb 100644 --- a/airflow-core/src/airflow/serialization/definitions/taskgroup.py +++ b/airflow-core/src/airflow/serialization/definitions/taskgroup.py @@ -238,6 +238,10 @@ class SerializedTaskGroup(DAGNode): if tg.node_id in graph_unsorted: break tg = tg.parent_group + + if tg: + # We are already going to visit that TG + break else: del graph_unsorted[node.node_id] graph_sorted.append(node) diff --git a/airflow-core/tests/unit/utils/test_task_group.py b/airflow-core/tests/unit/utils/test_task_group.py index 52524ecda2f..67ba538b6cb 100644 --- a/airflow-core/tests/unit/utils/test_task_group.py +++ b/airflow-core/tests/unit/utils/test_task_group.py @@ -158,7 +158,6 @@ EXPECTED_JSON_LEGACY = { EXPECTED_JSON = { "children": [ {"id": "task1", "label": "task1", "operator": "EmptyOperator", "type": "task"}, - {"id": "task5", "label": "task5", "operator": "EmptyOperator", "type": "task"}, { "children": [ { @@ -197,6 +196,7 @@ EXPECTED_JSON = { "tooltip": "", "type": "task", }, + {"id": "task5", "label": "task5", "operator": "EmptyOperator", "type": "task"}, ], "id": None, "is_mapped": False, @@ -277,7 +277,6 @@ def test_task_group_to_dict_with_prefix(dag_maker): expected_node_id = { "children": [ {"id": "task1", "label": "task1"}, - {"id": "task5", "label": "task5"}, { "id": "group234", "label": "group234", @@ -299,6 +298,7 @@ def test_task_group_to_dict_with_prefix(dag_maker): {"id": "group234.upstream_join_id", "label": ""}, ], }, + {"id": "task5", "label": "task5"}, ], "id": None, "label": "", @@ -347,7 +347,6 @@ def test_task_group_to_dict_with_task_decorator(dag_maker): "id": None, "children": [ {"id": "task_1"}, - {"id": "task_5"}, { "id": "group234", "children": [ @@ -358,6 +357,7 @@ def test_task_group_to_dict_with_task_decorator(dag_maker): {"id": "group234.downstream_join_id"}, ], }, + {"id": "task_5"}, ], } @@ -403,7 +403,6 @@ def test_task_group_to_dict_sub_dag(dag_maker): "id": None, "children": [ {"id": "task1"}, - {"id": "task5"}, { "id": "group234", "children": [ @@ -418,6 +417,7 @@ def test_task_group_to_dict_sub_dag(dag_maker): {"id": "group234.upstream_join_id"}, ], }, + {"id": "task5"}, ], } @@ -478,16 +478,6 @@ def test_task_group_to_dict_and_dag_edges(dag_maker): expected_node_id = { "id": None, "children": [ - { - "id": "group_c", - "children": [ - {"id": "group_c.task6"}, - {"id": "group_c.task7"}, - {"id": "group_c.task8"}, - {"id": "group_c.upstream_join_id"}, - {"id": "group_c.downstream_join_id"}, - ], - }, { "id": "group_d", "children": [ @@ -497,8 +487,6 @@ def test_task_group_to_dict_and_dag_edges(dag_maker): ], }, {"id": "task1"}, - {"id": "task10"}, - {"id": "task9"}, { "id": "group_a", "children": [ @@ -516,6 +504,18 @@ def test_task_group_to_dict_and_dag_edges(dag_maker): {"id": "group_a.downstream_join_id"}, ], }, + { + "id": "group_c", + "children": [ + {"id": "group_c.task6"}, + {"id": "group_c.task7"}, + {"id": "group_c.task8"}, + {"id": "group_c.upstream_join_id"}, + {"id": "group_c.downstream_join_id"}, + ], + }, + {"id": "task10"}, + {"id": "task9"}, ], } @@ -784,7 +784,6 @@ def test_task_group_context_mix(dag_maker): node_ids = { "id": None, "children": [ - {"id": "task_end"}, {"id": "task_start"}, { "id": "section_1", @@ -804,6 +803,7 @@ def test_task_group_context_mix(dag_maker): {"id": "section_1.downstream_join_id"}, ], }, + {"id": "task_end"}, ], }
