This is an automated email from the ASF dual-hosted git repository. potiuk pushed a commit to branch v3-0-test in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v3-0-test by this push: new a20a8128bab [v3-0-test] Apply task group sorting based on webserver config in grid structure response (#49418) (#50138) a20a8128bab is described below commit a20a8128bab4b45a2294a57b3bba78ed4c11648d Author: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> AuthorDate: Sat May 3 07:47:25 2025 +0200 [v3-0-test] Apply task group sorting based on webserver config in grid structure response (#49418) (#50138) * Apply topological sort on task group grid * Fix task_group sorting by utilizing get_task_group_children_getter to sort based on webserver config * Adjust test expectation to sort children topologically (cherry picked from commit d833ecb242b6546d937495c466507adcddc4c9f2) Co-authored-by: Jason <46563896+jsjasons...@users.noreply.github.com> --- .../api_fastapi/core_api/services/ui/grid.py | 19 +----- airflow-core/src/airflow/utils/task_group.py | 16 ++++- airflow-core/tests/unit/utils/test_task_group.py | 72 +++++++++++----------- 3 files changed, 53 insertions(+), 54 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/core_api/services/ui/grid.py b/airflow-core/src/airflow/api_fastapi/core_api/services/ui/grid.py index 7865ffa168f..346676e14cd 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/services/ui/grid.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/services/ui/grid.py @@ -18,9 +18,6 @@ from __future__ import annotations import contextlib -from functools import cache -from operator import methodcaller -from typing import Callable from uuid import UUID import structlog @@ -38,7 +35,6 @@ from airflow.api_fastapi.core_api.datamodels.ui.grid import ( from airflow.api_fastapi.core_api.datamodels.ui.structure import ( StructureDataResponse, ) -from airflow.configuration import conf from airflow.models.baseoperator import BaseOperator as DBBaseOperator from airflow.models.dag_version import DagVersion from airflow.models.taskmap import TaskMap @@ -49,20 +45,11 @@ from airflow.sdk.definitions.mappedoperator import MappedOperator from airflow.sdk.definitions.taskgroup import MappedTaskGroup, TaskGroup from airflow.serialization.serialized_objects import SerializedDAG from airflow.utils.state import TaskInstanceState -from airflow.utils.task_group import task_group_to_dict +from airflow.utils.task_group import get_task_group_children_getter, task_group_to_dict log = structlog.get_logger(logger_name=__name__) -@cache -def get_task_group_children_getter() -> Callable: - """Get the Task Group Children Getter for the DAG.""" - sort_order = conf.get("webserver", "grid_view_sorting_order") - if sort_order == "topological": - return methodcaller("topological_sort") - return methodcaller("hierarchical_alphabetical_sort") - - def get_task_group_map(dag: DAG) -> dict[str, dict[str, Any]]: """ Get the Task Group Map for the DAG. @@ -262,7 +249,7 @@ def fill_task_instance_summaries( def get_structure_from_dag(dag: DAG) -> StructureDataResponse: """If we do not have TIs, we just get the structure from the DAG.""" - nodes = [task_group_to_dict(child) for child in dag.task_group.topological_sort()] + nodes = [task_group_to_dict(child) for child in get_task_group_children_getter()(dag.task_group)] return StructureDataResponse(nodes=nodes, edges=[]) @@ -299,7 +286,7 @@ def get_combined_structure(task_instances, session): if serdag: dags.append(serdag.dag) for dag in dags: - nodes = [task_group_to_dict(child) for child in dag.task_group.topological_sort()] + nodes = [task_group_to_dict(child) for child in get_task_group_children_getter()(dag.task_group)] _merge_node_dicts(merged_nodes, nodes) return StructureDataResponse(nodes=merged_nodes, edges=[]) diff --git a/airflow-core/src/airflow/utils/task_group.py b/airflow-core/src/airflow/utils/task_group.py index 51270fa473a..034eb6d1bb8 100644 --- a/airflow-core/src/airflow/utils/task_group.py +++ b/airflow-core/src/airflow/utils/task_group.py @@ -19,9 +19,12 @@ from __future__ import annotations -from typing import TYPE_CHECKING +from functools import cache +from operator import methodcaller +from typing import TYPE_CHECKING, Callable import airflow.sdk.definitions.taskgroup +from airflow.configuration import conf if TYPE_CHECKING: from airflow.typing_compat import TypeAlias @@ -30,6 +33,15 @@ TaskGroup: TypeAlias = airflow.sdk.definitions.taskgroup.TaskGroup MappedTaskGroup: TypeAlias = airflow.sdk.definitions.taskgroup.MappedTaskGroup +@cache +def get_task_group_children_getter() -> Callable: + """Get the Task Group Children Getter for the DAG.""" + sort_order = conf.get("webserver", "grid_view_sorting_order") + if sort_order == "topological": + return methodcaller("topological_sort") + return methodcaller("hierarchical_alphabetical_sort") + + def task_group_to_dict(task_item_or_group, parent_group_is_mapped=False): """Create a nested dict representation of this TaskGroup and its children used to construct the Graph.""" from airflow.sdk.bases.operator import BaseOperator @@ -63,7 +75,7 @@ def task_group_to_dict(task_item_or_group, parent_group_is_mapped=False): is_mapped = isinstance(task_group, MappedTaskGroup) children = [ task_group_to_dict(child, parent_group_is_mapped=parent_group_is_mapped or is_mapped) - for child in sorted(task_group.children.values(), key=lambda t: t.label) + for child in get_task_group_children_getter()(task_group) ] if task_group.upstream_group_ids or task_group.upstream_task_ids: diff --git a/airflow-core/tests/unit/utils/test_task_group.py b/airflow-core/tests/unit/utils/test_task_group.py index 544d81d9661..8d6e6c0b334 100644 --- a/airflow-core/tests/unit/utils/test_task_group.py +++ b/airflow-core/tests/unit/utils/test_task_group.py @@ -65,6 +65,16 @@ EXPECTED_JSON_LEGACY = { "tooltip": "", }, "children": [ + { + "id": "task1", + "value": { + "label": "task1", + "labelStyle": "fill:#000;", + "style": "fill:#e8f7e4;", + "rx": 5, + "ry": 5, + }, + }, { "id": "group234", "value": { @@ -78,6 +88,16 @@ EXPECTED_JSON_LEGACY = { "isMapped": False, }, "children": [ + { + "id": "group234.task2", + "value": { + "label": "task2", + "labelStyle": "fill:#000;", + "style": "fill:#e8f7e4;", + "rx": 5, + "ry": 5, + }, + }, { "id": "group234.group34", "value": { @@ -122,16 +142,6 @@ EXPECTED_JSON_LEGACY = { }, ], }, - { - "id": "group234.task2", - "value": { - "label": "task2", - "labelStyle": "fill:#000;", - "style": "fill:#e8f7e4;", - "rx": 5, - "ry": 5, - }, - }, { "id": "group234.upstream_join_id", "value": { @@ -143,16 +153,6 @@ EXPECTED_JSON_LEGACY = { }, ], }, - { - "id": "task1", - "value": { - "label": "task1", - "labelStyle": "fill:#000;", - "style": "fill:#e8f7e4;", - "rx": 5, - "ry": 5, - }, - }, { "id": "task5", "value": { @@ -172,12 +172,14 @@ EXPECTED_JSON = { "tooltip": "", "is_mapped": False, "children": [ + {"id": "task1", "label": "task1", "operator": "EmptyOperator", "type": "task"}, { "id": "group234", "label": "group234", "tooltip": "", "is_mapped": False, "children": [ + {"id": "group234.task2", "label": "task2", "operator": "EmptyOperator", "type": "task"}, { "id": "group234.group34", "label": "group34", @@ -200,12 +202,10 @@ EXPECTED_JSON = { ], "type": "task", }, - {"id": "group234.task2", "label": "task2", "operator": "EmptyOperator", "type": "task"}, {"id": "group234.upstream_join_id", "label": "", "type": "join"}, ], "type": "task", }, - {"id": "task1", "label": "task1", "operator": "EmptyOperator", "type": "task"}, {"id": "task5", "label": "task5", "operator": "EmptyOperator", "type": "task"}, ], "type": "task", @@ -314,28 +314,28 @@ def test_build_task_group_with_prefix(): "id": None, "label": None, "children": [ + {"id": "task1", "label": "task1"}, { "id": "group234", "label": "group234", "children": [ + {"id": "task2", "label": "task2"}, { "id": "group34", "label": "group34", "children": [ + {"id": "group34.task3", "label": "task3"}, { "id": "group34.group4", "label": "group4", "children": [{"id": "task4", "label": "task4"}], }, - {"id": "group34.task3", "label": "task3"}, {"id": "group34.downstream_join_id", "label": ""}, ], }, - {"id": "task2", "label": "task2"}, {"id": "group234.upstream_join_id", "label": ""}, ], }, - {"id": "task1", "label": "task1"}, {"id": "task5", "label": "task5"}, ], } @@ -389,6 +389,7 @@ def test_build_task_group_with_task_decorator(): expected_node_id = { "id": None, "children": [ + {"id": "task_1"}, { "id": "group234", "children": [ @@ -399,7 +400,6 @@ def test_build_task_group_with_task_decorator(): {"id": "group234.downstream_join_id"}, ], }, - {"id": "task_1"}, {"id": "task_5"}, ], } @@ -448,6 +448,7 @@ def test_sub_dag_task_group(): expected_node_id = { "id": None, "children": [ + {"id": "task1"}, { "id": "group234", "children": [ @@ -462,7 +463,6 @@ def test_sub_dag_task_group(): {"id": "group234.upstream_join_id"}, ], }, - {"id": "task1"}, {"id": "task5"}, ], } @@ -540,6 +540,7 @@ def test_dag_edges(): expected_node_id = { "id": None, "children": [ + {"id": "task1"}, { "id": "group_a", "children": [ @@ -567,6 +568,8 @@ def test_dag_edges(): {"id": "group_c.downstream_join_id"}, ], }, + {"id": "task9"}, + {"id": "task10"}, { "id": "group_d", "children": [ @@ -575,9 +578,6 @@ def test_dag_edges(): {"id": "group_d.upstream_join_id"}, ], }, - {"id": "task1"}, - {"id": "task10"}, - {"id": "task9"}, ], } @@ -818,9 +818,12 @@ def test_build_task_group_deco_context_manager(): node_ids = { "id": None, "children": [ + {"id": "task_start"}, { "id": "section_1", "children": [ + {"id": "section_1.task_1"}, + {"id": "section_1.task_2"}, { "id": "section_1.section_2", "children": [ @@ -828,12 +831,9 @@ def test_build_task_group_deco_context_manager(): {"id": "section_1.section_2.task_4"}, ], }, - {"id": "section_1.task_1"}, - {"id": "section_1.task_2"}, ], }, {"id": "task_end"}, - {"id": "task_start"}, ], } @@ -992,6 +992,7 @@ def test_task_group_context_mix(): node_ids = { "id": None, "children": [ + {"id": "task_start"}, { "id": "section_1", "children": [ @@ -1011,7 +1012,6 @@ def test_task_group_context_mix(): ], }, {"id": "task_end"}, - {"id": "task_start"}, ], } @@ -1153,17 +1153,17 @@ def test_call_taskgroup_twice(): { "id": "task_group1", "children": [ - {"id": "task_group1.end_task"}, {"id": "task_group1.start_task"}, {"id": "task_group1.task"}, + {"id": "task_group1.end_task"}, ], }, { "id": "task_group1__1", "children": [ - {"id": "task_group1__1.end_task"}, {"id": "task_group1__1.start_task"}, {"id": "task_group1__1.task"}, + {"id": "task_group1__1.end_task"}, ], }, ],