This is an automated email from the ASF dual-hosted git repository.

pierrejeambrun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 060532bddbc Fix grid view crash when task converted to TaskGroup 
(#61208) (#61279)
060532bddbc is described below

commit 060532bddbc57e7183672e64352ab44902b06372
Author: y-sudharshan <[email protected]>
AuthorDate: Wed Feb 18 21:55:02 2026 +0530

    Fix grid view crash when task converted to TaskGroup (#61208) (#61279)
    
    * Fix grid view crash when task converted to TaskGroup (#61208)
    
    This commit fixes a TypeError crash in the grid view endpoint when a task
    is converted to a TaskGroup (or vice versa) between DAG versions.
    
    Root Cause:
    - Old DagRuns had task structure with children=None
    - New DagRuns had TaskGroup structure with children=[...]
    - The _merge_node_dicts function tried to iterate over None -> TypeError
    
    Changes:
    1. Added defensive None checks in _merge_node_dicts function
    2. Only merge children if both nodes have children (not None)
    3. Added comprehensive unit tests for edge cases
    4. Added integration test for task->TaskGroup conversion scenario
    
    Fixes #61208
    
    * Fix merge logic and add comprehensive tests per reviewer feedback
    
    * Fix integration test: use SerializedDagModel.write_dag instead of 
DBDagBag.bag_dag
    
    * Fix import: use serialized_objects.LazyDeserializedDAG instead of 
definitions.dag
    
    * Simplify test: use only sync_dag_to_db (removes redundant write_dag calls)
    
    * Trigger CI re-run
    
    * Fix test case: invert v1/v2 to test TaskGroup-to-task conversion
    
    The bug in issue #61208 occurs when a TaskGroup is converted to a
    simple task, not the other way around. This commit inverts Version 1
    and Version 2 in the test case to properly test the actual bug scenario:
    - Version 1: task_a is a TaskGroup with subtasks
    - Version 2: task_a becomes a simple task
    
    Updated all comments and assertions accordingly.
    
    * Fix trailing blank line in test_grid.py
    
    Pre-commit hooks removed trailing blank line - updating to match.
    
    * Improve test assertions with full expected JSON
    
    - Add API call after v1 to verify TaskGroup structure
    - Replace partial assertions with full expected JSON comparison
    - Verify v1 shows TaskGroup with children, v2 shows simple task
    - Addresses review feedback from @jason810496
    
    * Adjust and clean test
    
    ---------
    
    Co-authored-by: pierrejeambrun <[email protected]>
---
 .../api_fastapi/core_api/services/ui/grid.py       | 16 ++++--
 .../api_fastapi/core_api/routes/ui/test_grid.py    | 67 ++++++++++++++++++++++
 .../api_fastapi/core_api/services/ui/test_grid.py  | 66 +++++++++++++++++++++
 3 files changed, 145 insertions(+), 4 deletions(-)

diff --git a/airflow-core/src/airflow/api_fastapi/core_api/services/ui/grid.py 
b/airflow-core/src/airflow/api_fastapi/core_api/services/ui/grid.py
index 4dc97d5a7b4..46ec2c45d98 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/services/ui/grid.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/services/ui/grid.py
@@ -19,6 +19,7 @@ from __future__ import annotations
 
 from collections import Counter
 from collections.abc import Iterable
+from typing import Any
 
 import structlog
 
@@ -32,15 +33,22 @@ from airflow.serialization.definitions.taskgroup import 
SerializedTaskGroup
 log = structlog.get_logger(logger_name=__name__)
 
 
-def _merge_node_dicts(current, new) -> None:
+def _merge_node_dicts(current: list[dict[str, Any]], new: list[dict[str, Any]] 
| None) -> None:
+    """Merge node dictionaries from different DAG versions, handling structure 
changes."""
+    # Handle None case - can occur when merging old DAG versions
+    # where a TaskGroup was converted to a task or vice versa
+    if new is None:
+        return
+
     current_nodes_by_id = {node["id"]: node for node in current}
     for node in new:
         node_id = node["id"]
         current_node = current_nodes_by_id.get(node_id)
         if current_node is not None:
-            # if we have children, merge those as well
-            if current_node.get("children"):
-                _merge_node_dicts(current_node["children"], 
node.get("children", []))
+            # Only merge children if current node already has children
+            # This preserves the structure of the latest DAG version
+            if current_node.get("children") is not None:
+                _merge_node_dicts(current_node["children"], 
node.get("children"))
         else:
             current.append(node)
             current_nodes_by_id[node_id] = node
diff --git 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_grid.py 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_grid.py
index 3a3b44594c5..e8ccafaa32d 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_grid.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_grid.py
@@ -29,6 +29,7 @@ from airflow.models.dag import DagModel
 from airflow.models.dagbag import DBDagBag
 from airflow.models.taskinstance import TaskInstance
 from airflow.providers.standard.operators.empty import EmptyOperator
+from airflow.providers.standard.operators.python import PythonOperator
 from airflow.sdk import task_group
 from airflow.sdk.definitions.taskgroup import TaskGroup
 from airflow.utils.session import provide_session
@@ -874,6 +875,72 @@ class TestGetGridDataEndpoint:
         assert "is_mapped" not in t4
         assert "children" not in t4
 
+    def test_task_converted_to_task_group_doesnt_crash(self, session, 
dag_maker, test_client):
+        """Test that converting a Task to a TaskGroup with same name doesn't 
crash grid view.
+
+        Regression test for https://github.com/apache/airflow/issues/61208
+        """
+
+        dag_id = "test_task_to_group_conversion"
+
+        # Version 1: task_a is a simple task
+        with dag_maker(
+            dag_id=dag_id,
+            start_date=pendulum.datetime(2024, 1, 1, tz="UTC"),
+            schedule=None,
+        ):
+            PythonOperator(task_id="task_a", python_callable=lambda: True)
+            PythonOperator(task_id="task_b", python_callable=lambda: True)
+
+        # Create another DagRun with the new version
+        dag_maker.create_dagrun(
+            run_id="test_run_1",
+            run_type=DagRunType.MANUAL,
+            logical_date=pendulum.datetime(2024, 1, 3, tz="UTC"),
+        )
+
+        response_v1 = test_client.get(f"/grid/structure/{dag_id}")
+        assert response_v1.status_code == 200
+        nodes_v1 = response_v1.json()
+        assert nodes_v1 == [
+            {"id": "task_a", "label": "task_a"},
+            {"id": "task_b", "label": "task_b"},
+        ]
+
+        # Version 2: task_a is a TaskGroup with subtasks
+        with dag_maker(
+            dag_id=dag_id,
+            start_date=pendulum.datetime(2024, 1, 1, tz="UTC"),
+            schedule=None,
+            serialized=True,
+        ):
+            with TaskGroup(group_id="task_a"):
+                PythonOperator(task_id="task_a1", python_callable=lambda: True)
+                PythonOperator(task_id="task_a2", python_callable=lambda: True)
+            PythonOperator(task_id="task_b", python_callable=lambda: True)
+
+        dag_maker.create_dagrun(
+            run_id="test_run_2",
+            run_type=DagRunType.MANUAL,
+            logical_date=pendulum.datetime(2024, 1, 1, tz="UTC"),
+        )
+
+        # Verify v2 structure shows TaskGroup with children
+        response_v2 = test_client.get(f"/grid/structure/{dag_id}")
+        assert response_v2.status_code == 200
+        nodes_v2 = response_v2.json()
+        assert nodes_v2 == [
+            {
+                "id": "task_a",
+                "label": "task_a",
+                "children": [
+                    {"id": "task_a.task_a1", "label": "task_a1"},
+                    {"id": "task_a.task_a2", "label": "task_a2"},
+                ],
+            },
+            {"id": "task_b", "label": "task_b"},
+        ]
+
     # Tests for root, include_upstream, and include_downstream parameters
     @pytest.mark.parametrize(
         ("params", "expected_task_ids", "description"),
diff --git 
a/airflow-core/tests/unit/api_fastapi/core_api/services/ui/test_grid.py 
b/airflow-core/tests/unit/api_fastapi/core_api/services/ui/test_grid.py
index 55c3532c171..9ca55115342 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/services/ui/test_grid.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/services/ui/test_grid.py
@@ -20,6 +20,72 @@ from __future__ import annotations
 from airflow.api_fastapi.core_api.services.ui.grid import _merge_node_dicts
 
 
+def test_merge_node_dicts_with_none_new_list():
+    """Test merging with None new list doesn't crash.
+
+    Regression test for https://github.com/apache/airflow/issues/61208
+    When a TaskGroup is converted to a task, new can be None for some runs.
+    """
+    current = [{"id": "task1", "label": "Task 1"}]
+    new = None
+
+    _merge_node_dicts(current, new)
+
+    assert len(current) == 1
+    assert current[0]["id"] == "task1"
+
+
+def test_merge_node_dicts_preserves_taskgroup_structure():
+    """Test TaskGroup structure is preserved when converting to task."""
+    current = [
+        {
+            "id": "task_a",
+            "label": "Task A",
+            "children": [
+                {"id": "task_a.subtask1", "label": "Subtask 1"},
+            ],
+        }
+    ]
+    new = [{"id": "task_a", "label": "Task A", "children": None}]
+
+    _merge_node_dicts(current, new)
+
+    # Current structure (TaskGroup) is preserved
+    assert len(current) == 1
+    assert current[0]["id"] == "task_a"
+    assert current[0]["children"] is not None
+    assert len(current[0]["children"]) == 1
+
+
+def test_merge_node_dicts_merges_nested_children():
+    """Test merging nodes with nested children."""
+    current = [
+        {
+            "id": "group1",
+            "label": "Group 1",
+            "children": [
+                {"id": "group1.task1", "label": "Task 1"},
+            ],
+        }
+    ]
+    new = [
+        {
+            "id": "group1",
+            "label": "Group 1",
+            "children": [
+                {"id": "group1.task1", "label": "Task 1"},
+                {"id": "group1.task2", "label": "Task 2"},
+            ],
+        }
+    ]
+
+    _merge_node_dicts(current, new)
+
+    assert len(current) == 1
+    assert current[0]["id"] == "group1"
+    assert len(current[0]["children"]) == 2
+
+
 def test_merge_node_dicts_merges_children_and_appends_new_nodes():
     current = [
         {

Reply via email to