This is an automated email from the ASF dual-hosted git repository.

uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new f9278495b9 Allow to sort Grid View alphabetically (#32179)
f9278495b9 is described below

commit f9278495b930e070dea7dcb2b7fa05db601b3540
Author: Igor Khrol <[email protected]>
AuthorDate: Thu Jun 29 01:11:43 2023 +0300

    Allow to sort Grid View alphabetically (#32179)
---
 airflow/config_templates/config.yml          |  7 +++++
 airflow/config_templates/default_airflow.cfg |  3 ++
 airflow/utils/task_group.py                  | 12 ++++++++
 airflow/www/views.py                         | 17 +++++++++--
 tests/utils/test_task_group.py               | 42 ++++++++++++++++++++++++++++
 5 files changed, 79 insertions(+), 2 deletions(-)

diff --git a/airflow/config_templates/config.yml 
b/airflow/config_templates/config.yml
index ea2bb03fa3..b6b1603265 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -1545,6 +1545,13 @@ webserver:
       type: string
       example: ~
       default: "LR"
+    grid_view_sorting_order:
+      description: |
+        Sorting order in grid view. Valid values are: ``topological``, 
``hierarchical_alphabetical``
+      version_added: 2.7.0
+      type: string
+      example: ~
+      default: "topological"
     log_fetch_timeout_sec:
       description: |
         The amount of time (in secs) webserver will wait for initial handshake
diff --git a/airflow/config_templates/default_airflow.cfg 
b/airflow/config_templates/default_airflow.cfg
index a806eef4d8..1fbf58f7bc 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -810,6 +810,9 @@ dag_default_view = grid
 # ``LR`` (Left->Right), ``TB`` (Top->Bottom), ``RL`` (Right->Left), ``BT`` 
(Bottom->Top)
 dag_orientation = LR
 
+# Sorting order in grid view. Valid values are: ``topological``, 
``hierarchical_alphabetical``
+grid_view_sorting_order = topological
+
 # The amount of time (in secs) webserver will wait for initial handshake
 # while fetching logs from other worker machine
 log_fetch_timeout_sec = 5
diff --git a/airflow/utils/task_group.py b/airflow/utils/task_group.py
index f20a5032a8..9224e55e69 100644
--- a/airflow/utils/task_group.py
+++ b/airflow/utils/task_group.py
@@ -438,6 +438,18 @@ class TaskGroup(DAGNode):
 
         return DagAttributeTypes.TASK_GROUP, 
TaskGroupSerialization.serialize_task_group(self)
 
+    def hierarchical_alphabetical_sort(self):
+        """
+        Sorts children in hierarchical alphabetical order:
+        - groups in alphabetical order first
+        - tasks in alphabetical order after them.
+
+        :return: list of tasks in hierarchical alphabetical order
+        """
+        return sorted(
+            self.children.values(), key=lambda node: (not isinstance(node, 
TaskGroup), node.node_id)
+        )
+
     def topological_sort(self, _include_subdag_tasks: bool = False):
         """
         Sorts children in topographical order, such that a task comes after 
any of its
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 32a0f0c19e..607927650b 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -85,7 +85,12 @@ from airflow.api.common.mark_tasks import (
 )
 from airflow.configuration import AIRFLOW_CONFIG, conf
 from airflow.datasets import Dataset
-from airflow.exceptions import AirflowException, ParamValidationError, 
RemovedInAirflow3Warning
+from airflow.exceptions import (
+    AirflowConfigException,
+    AirflowException,
+    ParamValidationError,
+    RemovedInAirflow3Warning,
+)
 from airflow.executors.executor_loader import ExecutorLoader
 from airflow.jobs.job import Job
 from airflow.jobs.scheduler_job_runner import SchedulerJobRunner
@@ -308,6 +313,14 @@ def dag_to_grid(dag: DagModel, dag_runs: Sequence[DagRun], 
session: Session):
 
     grouped_tis = {task_id: list(tis) for task_id, tis in 
itertools.groupby(query, key=lambda ti: ti.task_id)}
 
+    sort_order = conf.get("webserver", "grid_view_sorting_order", 
fallback="topological")
+    if sort_order == "topological":
+        sort_children_fn = lambda task_group: task_group.topological_sort()
+    elif sort_order == "hierarchical_alphabetical":
+        sort_children_fn = lambda task_group: 
task_group.hierarchical_alphabetical_sort()
+    else:
+        raise AirflowConfigException(f"Unsupported grid_view_sorting_order: 
{sort_order}")
+
     def task_group_to_grid(item, grouped_tis, *, is_parent_mapped: bool):
         if not isinstance(item, TaskGroup):
 
@@ -384,7 +397,7 @@ def dag_to_grid(dag: DagModel, dag_runs: Sequence[DagRun], 
session: Session):
 
         children = [
             task_group_to_grid(child, grouped_tis, 
is_parent_mapped=group_is_mapped)
-            for child in task_group.topological_sort()
+            for child in sort_children_fn(task_group)
         ]
 
         def get_summary(dag_run: DagRun):
diff --git a/tests/utils/test_task_group.py b/tests/utils/test_task_group.py
index e7088afbd0..eb1e77e93f 100644
--- a/tests/utils/test_task_group.py
+++ b/tests/utils/test_task_group.py
@@ -1175,6 +1175,48 @@ def test_topological_nested_groups():
     ]
 
 
+def test_hierarchical_alphabetical_sort():
+    execution_date = pendulum.parse("20200101")
+    with DAG("test_dag_edges", start_date=execution_date) as dag:
+        task1 = EmptyOperator(task_id="task1")
+        task5 = EmptyOperator(task_id="task5")
+        with TaskGroup("group_c"):
+            task7 = EmptyOperator(task_id="task7")
+        with TaskGroup("group_b"):
+            task6 = EmptyOperator(task_id="task6")
+        with TaskGroup("group_a"):
+            with TaskGroup("group_d"):
+                task2 = EmptyOperator(task_id="task2")
+                task3 = EmptyOperator(task_id="task3")
+                task4 = EmptyOperator(task_id="task4")
+            task9 = EmptyOperator(task_id="task9")
+            task8 = EmptyOperator(task_id="task8")
+
+    def nested(group):
+        return [
+            nested(node) if isinstance(node, TaskGroup) else node
+            for node in group.hierarchical_alphabetical_sort()
+        ]
+
+    sorted_list = nested(dag.task_group)
+
+    assert sorted_list == [
+        [  # group_a
+            [  # group_d
+                task2,
+                task3,
+                task4,
+            ],
+            task8,
+            task9,
+        ],
+        [task6],  # group_b
+        [task7],  # group_c
+        task1,
+        task5,
+    ]
+
+
 def test_topological_group_dep():
     execution_date = pendulum.parse("20200101")
     with DAG("test_dag_edges", start_date=execution_date) as dag:

Reply via email to