This is an automated email from the ASF dual-hosted git repository.
pierrejeambrun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 060532bddbc Fix grid view crash when task converted to TaskGroup
(#61208) (#61279)
060532bddbc is described below
commit 060532bddbc57e7183672e64352ab44902b06372
Author: y-sudharshan <[email protected]>
AuthorDate: Wed Feb 18 21:55:02 2026 +0530
Fix grid view crash when task converted to TaskGroup (#61208) (#61279)
* Fix grid view crash when task converted to TaskGroup (#61208)
This commit fixes a TypeError crash in the grid view endpoint when a task
is converted to a TaskGroup (or vice versa) between DAG versions.
Root Cause:
- Old DagRuns had task structure with children=None
- New DagRuns had TaskGroup structure with children=[...]
- The _merge_node_dicts function tried to iterate over None -> TypeError
Changes:
1. Added defensive None checks in _merge_node_dicts function
2. Only merge children if both nodes have children (not None)
3. Added comprehensive unit tests for edge cases
4. Added integration test for task->TaskGroup conversion scenario
Fixes #61208
* Fix merge logic and add comprehensive tests per reviewer feedback
* Fix integration test: use SerializedDagModel.write_dag instead of
DBDagBag.bag_dag
* Fix import: use serialized_objects.LazyDeserializedDAG instead of
definitions.dag
* Simplify test: use only sync_dag_to_db (removes redundant write_dag calls)
* Trigger CI re-run
* Fix test case: invert v1/v2 to test TaskGroup-to-task conversion
The bug in issue #61208 occurs when a TaskGroup is converted to a
simple task, not the other way around. This commit inverts Version 1
and Version 2 in the test case to properly test the actual bug scenario:
- Version 1: task_a is a TaskGroup with subtasks
- Version 2: task_a becomes a simple task
Updated all comments and assertions accordingly.
* Fix trailing blank line in test_grid.py
Pre-commit hooks removed trailing blank line - updating to match.
* Improve test assertions with full expected JSON
- Add API call after v1 to verify TaskGroup structure
- Replace partial assertions with full expected JSON comparison
- Verify v1 shows TaskGroup with children, v2 shows simple task
- Addresses review feedback from @jason810496
* Adjust and clean test
---------
Co-authored-by: pierrejeambrun <[email protected]>
---
.../api_fastapi/core_api/services/ui/grid.py | 16 ++++--
.../api_fastapi/core_api/routes/ui/test_grid.py | 67 ++++++++++++++++++++++
.../api_fastapi/core_api/services/ui/test_grid.py | 66 +++++++++++++++++++++
3 files changed, 145 insertions(+), 4 deletions(-)
diff --git a/airflow-core/src/airflow/api_fastapi/core_api/services/ui/grid.py
b/airflow-core/src/airflow/api_fastapi/core_api/services/ui/grid.py
index 4dc97d5a7b4..46ec2c45d98 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/services/ui/grid.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/services/ui/grid.py
@@ -19,6 +19,7 @@ from __future__ import annotations
from collections import Counter
from collections.abc import Iterable
+from typing import Any
import structlog
@@ -32,15 +33,22 @@ from airflow.serialization.definitions.taskgroup import
SerializedTaskGroup
log = structlog.get_logger(logger_name=__name__)
-def _merge_node_dicts(current, new) -> None:
+def _merge_node_dicts(current: list[dict[str, Any]], new: list[dict[str, Any]]
| None) -> None:
+ """Merge node dictionaries from different DAG versions, handling structure
changes."""
+ # Handle None case - can occur when merging old DAG versions
+ # where a TaskGroup was converted to a task or vice versa
+ if new is None:
+ return
+
current_nodes_by_id = {node["id"]: node for node in current}
for node in new:
node_id = node["id"]
current_node = current_nodes_by_id.get(node_id)
if current_node is not None:
- # if we have children, merge those as well
- if current_node.get("children"):
- _merge_node_dicts(current_node["children"],
node.get("children", []))
+ # Only merge children if current node already has children
+ # This preserves the structure of the latest DAG version
+ if current_node.get("children") is not None:
+ _merge_node_dicts(current_node["children"],
node.get("children"))
else:
current.append(node)
current_nodes_by_id[node_id] = node
diff --git
a/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_grid.py
b/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_grid.py
index 3a3b44594c5..e8ccafaa32d 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_grid.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_grid.py
@@ -29,6 +29,7 @@ from airflow.models.dag import DagModel
from airflow.models.dagbag import DBDagBag
from airflow.models.taskinstance import TaskInstance
from airflow.providers.standard.operators.empty import EmptyOperator
+from airflow.providers.standard.operators.python import PythonOperator
from airflow.sdk import task_group
from airflow.sdk.definitions.taskgroup import TaskGroup
from airflow.utils.session import provide_session
@@ -874,6 +875,72 @@ class TestGetGridDataEndpoint:
assert "is_mapped" not in t4
assert "children" not in t4
+ def test_task_converted_to_task_group_doesnt_crash(self, session,
dag_maker, test_client):
+ """Test that converting a Task to a TaskGroup with same name doesn't
crash grid view.
+
+ Regression test for https://github.com/apache/airflow/issues/61208
+ """
+
+ dag_id = "test_task_to_group_conversion"
+
+ # Version 1: task_a is a simple task
+ with dag_maker(
+ dag_id=dag_id,
+ start_date=pendulum.datetime(2024, 1, 1, tz="UTC"),
+ schedule=None,
+ ):
+ PythonOperator(task_id="task_a", python_callable=lambda: True)
+ PythonOperator(task_id="task_b", python_callable=lambda: True)
+
+ # Create another DagRun with the new version
+ dag_maker.create_dagrun(
+ run_id="test_run_1",
+ run_type=DagRunType.MANUAL,
+ logical_date=pendulum.datetime(2024, 1, 3, tz="UTC"),
+ )
+
+ response_v1 = test_client.get(f"/grid/structure/{dag_id}")
+ assert response_v1.status_code == 200
+ nodes_v1 = response_v1.json()
+ assert nodes_v1 == [
+ {"id": "task_a", "label": "task_a"},
+ {"id": "task_b", "label": "task_b"},
+ ]
+
+ # Version 2: task_a is a TaskGroup with subtasks
+ with dag_maker(
+ dag_id=dag_id,
+ start_date=pendulum.datetime(2024, 1, 1, tz="UTC"),
+ schedule=None,
+ serialized=True,
+ ):
+ with TaskGroup(group_id="task_a"):
+ PythonOperator(task_id="task_a1", python_callable=lambda: True)
+ PythonOperator(task_id="task_a2", python_callable=lambda: True)
+ PythonOperator(task_id="task_b", python_callable=lambda: True)
+
+ dag_maker.create_dagrun(
+ run_id="test_run_2",
+ run_type=DagRunType.MANUAL,
+ logical_date=pendulum.datetime(2024, 1, 1, tz="UTC"),
+ )
+
+ # Verify v2 structure shows TaskGroup with children
+ response_v2 = test_client.get(f"/grid/structure/{dag_id}")
+ assert response_v2.status_code == 200
+ nodes_v2 = response_v2.json()
+ assert nodes_v2 == [
+ {
+ "id": "task_a",
+ "label": "task_a",
+ "children": [
+ {"id": "task_a.task_a1", "label": "task_a1"},
+ {"id": "task_a.task_a2", "label": "task_a2"},
+ ],
+ },
+ {"id": "task_b", "label": "task_b"},
+ ]
+
# Tests for root, include_upstream, and include_downstream parameters
@pytest.mark.parametrize(
("params", "expected_task_ids", "description"),
diff --git
a/airflow-core/tests/unit/api_fastapi/core_api/services/ui/test_grid.py
b/airflow-core/tests/unit/api_fastapi/core_api/services/ui/test_grid.py
index 55c3532c171..9ca55115342 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/services/ui/test_grid.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/services/ui/test_grid.py
@@ -20,6 +20,72 @@ from __future__ import annotations
from airflow.api_fastapi.core_api.services.ui.grid import _merge_node_dicts
+def test_merge_node_dicts_with_none_new_list():
+ """Test merging with None new list doesn't crash.
+
+ Regression test for https://github.com/apache/airflow/issues/61208
+ When a TaskGroup is converted to a task, new can be None for some runs.
+ """
+ current = [{"id": "task1", "label": "Task 1"}]
+ new = None
+
+ _merge_node_dicts(current, new)
+
+ assert len(current) == 1
+ assert current[0]["id"] == "task1"
+
+
+def test_merge_node_dicts_preserves_taskgroup_structure():
+ """Test TaskGroup structure is preserved when converting to task."""
+ current = [
+ {
+ "id": "task_a",
+ "label": "Task A",
+ "children": [
+ {"id": "task_a.subtask1", "label": "Subtask 1"},
+ ],
+ }
+ ]
+ new = [{"id": "task_a", "label": "Task A", "children": None}]
+
+ _merge_node_dicts(current, new)
+
+ # Current structure (TaskGroup) is preserved
+ assert len(current) == 1
+ assert current[0]["id"] == "task_a"
+ assert current[0]["children"] is not None
+ assert len(current[0]["children"]) == 1
+
+
+def test_merge_node_dicts_merges_nested_children():
+ """Test merging nodes with nested children."""
+ current = [
+ {
+ "id": "group1",
+ "label": "Group 1",
+ "children": [
+ {"id": "group1.task1", "label": "Task 1"},
+ ],
+ }
+ ]
+ new = [
+ {
+ "id": "group1",
+ "label": "Group 1",
+ "children": [
+ {"id": "group1.task1", "label": "Task 1"},
+ {"id": "group1.task2", "label": "Task 2"},
+ ],
+ }
+ ]
+
+ _merge_node_dicts(current, new)
+
+ assert len(current) == 1
+ assert current[0]["id"] == "group1"
+ assert len(current[0]["children"]) == 2
+
+
def test_merge_node_dicts_merges_children_and_appends_new_nodes():
current = [
{