This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit ccc33ffd109b64c6e41512d9cbaa38c53cabef7d
Author: Pierre Jeambrun <[email protected]>
AuthorDate: Tue Oct 21 19:27:18 2025 +0200

    Fix topological sort for Grid View (#56963)
    
    Fixes: https://github.com/apache/airflow/issues/55899
    Closes https://github.com/apache/airflow/pull/56321
    
    Similarly to 
https://github.com/apache/airflow/blob/main/task-sdk/src/airflow/sdk/definitions/taskgroup.py#L561,
 we need an exit condition if the taskgroup is found in usorted graph.
    
    Adjusted test, which indeed were not in the correct topological order.
    
    Testing dag code:
    ```python
    from __future__ import annotations
    
    import datetime
    
    import pendulum
    
    from airflow.sdk import dag, task, task_group
    
    @task
    def get_nums() -> list[int]:
        return [1, 2, 4]
    
    @task
    def times_2(n: int) -> int:
        return n * 2
    
    @task_group(group_id="process_number")
    def process_number(n: int):
        value = times_2(n)
        return value
    
    @task
    def log_success() -> None:
        print("Processed successful!")
    
    @dag(
        schedule=None,
        catchup=False,
        start_date=pendulum.datetime(2025, 4, 1, tz="Europe/Copenhagen"),
        dagrun_timeout=datetime.timedelta(minutes=30),
        dag_id="55899_bug",
    )
    def test():
        nums = get_nums()
        processed = process_number.expand(n=nums)
        processed >> log_success()
    
    test()
    ```
    
    ### Before
    <img width="1917" height="1016" alt="Screenshot 2025-10-21 at 17 57 20" 
src="https://github.com/user-attachments/assets/d5220b87-a23b-40f7-8ecf-cb1b39d72f53";
 />
    
    ### After
    <img width="1923" height="937" alt="Screenshot 2025-10-21 at 17 56 57" 
src="https://github.com/user-attachments/assets/37f75b19-2e80-4765-b9cb-e425f9054b78";
 />
    
    (cherry picked from commit c3f53b1d598a55df42ba588fbd1dd10fab2f2ae8)
---
 .../airflow/serialization/definitions/taskgroup.py |  4 +++
 airflow-core/tests/unit/utils/test_task_group.py   | 34 +++++++++++-----------
 2 files changed, 21 insertions(+), 17 deletions(-)

diff --git a/airflow-core/src/airflow/serialization/definitions/taskgroup.py 
b/airflow-core/src/airflow/serialization/definitions/taskgroup.py
index 4df819c97ec..6c0add8cdfb 100644
--- a/airflow-core/src/airflow/serialization/definitions/taskgroup.py
+++ b/airflow-core/src/airflow/serialization/definitions/taskgroup.py
@@ -238,6 +238,10 @@ class SerializedTaskGroup(DAGNode):
                         if tg.node_id in graph_unsorted:
                             break
                         tg = tg.parent_group
+
+                    if tg:
+                        # We are already going to visit that TG
+                        break
                 else:
                     del graph_unsorted[node.node_id]
                     graph_sorted.append(node)
diff --git a/airflow-core/tests/unit/utils/test_task_group.py 
b/airflow-core/tests/unit/utils/test_task_group.py
index 52524ecda2f..67ba538b6cb 100644
--- a/airflow-core/tests/unit/utils/test_task_group.py
+++ b/airflow-core/tests/unit/utils/test_task_group.py
@@ -158,7 +158,6 @@ EXPECTED_JSON_LEGACY = {
 EXPECTED_JSON = {
     "children": [
         {"id": "task1", "label": "task1", "operator": "EmptyOperator", "type": 
"task"},
-        {"id": "task5", "label": "task5", "operator": "EmptyOperator", "type": 
"task"},
         {
             "children": [
                 {
@@ -197,6 +196,7 @@ EXPECTED_JSON = {
             "tooltip": "",
             "type": "task",
         },
+        {"id": "task5", "label": "task5", "operator": "EmptyOperator", "type": 
"task"},
     ],
     "id": None,
     "is_mapped": False,
@@ -277,7 +277,6 @@ def test_task_group_to_dict_with_prefix(dag_maker):
     expected_node_id = {
         "children": [
             {"id": "task1", "label": "task1"},
-            {"id": "task5", "label": "task5"},
             {
                 "id": "group234",
                 "label": "group234",
@@ -299,6 +298,7 @@ def test_task_group_to_dict_with_prefix(dag_maker):
                     {"id": "group234.upstream_join_id", "label": ""},
                 ],
             },
+            {"id": "task5", "label": "task5"},
         ],
         "id": None,
         "label": "",
@@ -347,7 +347,6 @@ def test_task_group_to_dict_with_task_decorator(dag_maker):
         "id": None,
         "children": [
             {"id": "task_1"},
-            {"id": "task_5"},
             {
                 "id": "group234",
                 "children": [
@@ -358,6 +357,7 @@ def test_task_group_to_dict_with_task_decorator(dag_maker):
                     {"id": "group234.downstream_join_id"},
                 ],
             },
+            {"id": "task_5"},
         ],
     }
 
@@ -403,7 +403,6 @@ def test_task_group_to_dict_sub_dag(dag_maker):
         "id": None,
         "children": [
             {"id": "task1"},
-            {"id": "task5"},
             {
                 "id": "group234",
                 "children": [
@@ -418,6 +417,7 @@ def test_task_group_to_dict_sub_dag(dag_maker):
                     {"id": "group234.upstream_join_id"},
                 ],
             },
+            {"id": "task5"},
         ],
     }
 
@@ -478,16 +478,6 @@ def test_task_group_to_dict_and_dag_edges(dag_maker):
     expected_node_id = {
         "id": None,
         "children": [
-            {
-                "id": "group_c",
-                "children": [
-                    {"id": "group_c.task6"},
-                    {"id": "group_c.task7"},
-                    {"id": "group_c.task8"},
-                    {"id": "group_c.upstream_join_id"},
-                    {"id": "group_c.downstream_join_id"},
-                ],
-            },
             {
                 "id": "group_d",
                 "children": [
@@ -497,8 +487,6 @@ def test_task_group_to_dict_and_dag_edges(dag_maker):
                 ],
             },
             {"id": "task1"},
-            {"id": "task10"},
-            {"id": "task9"},
             {
                 "id": "group_a",
                 "children": [
@@ -516,6 +504,18 @@ def test_task_group_to_dict_and_dag_edges(dag_maker):
                     {"id": "group_a.downstream_join_id"},
                 ],
             },
+            {
+                "id": "group_c",
+                "children": [
+                    {"id": "group_c.task6"},
+                    {"id": "group_c.task7"},
+                    {"id": "group_c.task8"},
+                    {"id": "group_c.upstream_join_id"},
+                    {"id": "group_c.downstream_join_id"},
+                ],
+            },
+            {"id": "task10"},
+            {"id": "task9"},
         ],
     }
 
@@ -784,7 +784,6 @@ def test_task_group_context_mix(dag_maker):
     node_ids = {
         "id": None,
         "children": [
-            {"id": "task_end"},
             {"id": "task_start"},
             {
                 "id": "section_1",
@@ -804,6 +803,7 @@ def test_task_group_context_mix(dag_maker):
                     {"id": "section_1.downstream_join_id"},
                 ],
             },
+            {"id": "task_end"},
         ],
     }
 

Reply via email to