This is an automated email from the ASF dual-hosted git repository.
uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new f9278495b9 Allow to sort Grid View alphabetically (#32179)
f9278495b9 is described below
commit f9278495b930e070dea7dcb2b7fa05db601b3540
Author: Igor Khrol <[email protected]>
AuthorDate: Thu Jun 29 01:11:43 2023 +0300
Allow to sort Grid View alphabetically (#32179)
---
airflow/config_templates/config.yml | 7 +++++
airflow/config_templates/default_airflow.cfg | 3 ++
airflow/utils/task_group.py | 12 ++++++++
airflow/www/views.py | 17 +++++++++--
tests/utils/test_task_group.py | 42 ++++++++++++++++++++++++++++
5 files changed, 79 insertions(+), 2 deletions(-)
diff --git a/airflow/config_templates/config.yml
b/airflow/config_templates/config.yml
index ea2bb03fa3..b6b1603265 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -1545,6 +1545,13 @@ webserver:
type: string
example: ~
default: "LR"
+ grid_view_sorting_order:
+ description: |
+ Sorting order in grid view. Valid values are: ``topological``,
``hierarchical_alphabetical``
+ version_added: 2.7.0
+ type: string
+ example: ~
+ default: "topological"
log_fetch_timeout_sec:
description: |
The amount of time (in secs) webserver will wait for initial handshake
diff --git a/airflow/config_templates/default_airflow.cfg
b/airflow/config_templates/default_airflow.cfg
index a806eef4d8..1fbf58f7bc 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -810,6 +810,9 @@ dag_default_view = grid
# ``LR`` (Left->Right), ``TB`` (Top->Bottom), ``RL`` (Right->Left), ``BT``
(Bottom->Top)
dag_orientation = LR
+# Sorting order in grid view. Valid values are: ``topological``,
``hierarchical_alphabetical``
+grid_view_sorting_order = topological
+
# The amount of time (in secs) webserver will wait for initial handshake
# while fetching logs from other worker machine
log_fetch_timeout_sec = 5
diff --git a/airflow/utils/task_group.py b/airflow/utils/task_group.py
index f20a5032a8..9224e55e69 100644
--- a/airflow/utils/task_group.py
+++ b/airflow/utils/task_group.py
@@ -438,6 +438,18 @@ class TaskGroup(DAGNode):
return DagAttributeTypes.TASK_GROUP,
TaskGroupSerialization.serialize_task_group(self)
+ def hierarchical_alphabetical_sort(self):
+ """
+ Sorts children in hierarchical alphabetical order:
+ - groups in alphabetical order first
+ - tasks in alphabetical order after them.
+
+ :return: list of tasks in hierarchical alphabetical order
+ """
+ return sorted(
+ self.children.values(), key=lambda node: (not isinstance(node,
TaskGroup), node.node_id)
+ )
+
def topological_sort(self, _include_subdag_tasks: bool = False):
"""
Sorts children in topographical order, such that a task comes after
any of its
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 32a0f0c19e..607927650b 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -85,7 +85,12 @@ from airflow.api.common.mark_tasks import (
)
from airflow.configuration import AIRFLOW_CONFIG, conf
from airflow.datasets import Dataset
-from airflow.exceptions import AirflowException, ParamValidationError,
RemovedInAirflow3Warning
+from airflow.exceptions import (
+ AirflowConfigException,
+ AirflowException,
+ ParamValidationError,
+ RemovedInAirflow3Warning,
+)
from airflow.executors.executor_loader import ExecutorLoader
from airflow.jobs.job import Job
from airflow.jobs.scheduler_job_runner import SchedulerJobRunner
@@ -308,6 +313,14 @@ def dag_to_grid(dag: DagModel, dag_runs: Sequence[DagRun],
session: Session):
grouped_tis = {task_id: list(tis) for task_id, tis in
itertools.groupby(query, key=lambda ti: ti.task_id)}
+ sort_order = conf.get("webserver", "grid_view_sorting_order",
fallback="topological")
+ if sort_order == "topological":
+ sort_children_fn = lambda task_group: task_group.topological_sort()
+ elif sort_order == "hierarchical_alphabetical":
+ sort_children_fn = lambda task_group:
task_group.hierarchical_alphabetical_sort()
+ else:
+ raise AirflowConfigException(f"Unsupported grid_view_sorting_order:
{sort_order}")
+
def task_group_to_grid(item, grouped_tis, *, is_parent_mapped: bool):
if not isinstance(item, TaskGroup):
@@ -384,7 +397,7 @@ def dag_to_grid(dag: DagModel, dag_runs: Sequence[DagRun],
session: Session):
children = [
task_group_to_grid(child, grouped_tis,
is_parent_mapped=group_is_mapped)
- for child in task_group.topological_sort()
+ for child in sort_children_fn(task_group)
]
def get_summary(dag_run: DagRun):
diff --git a/tests/utils/test_task_group.py b/tests/utils/test_task_group.py
index e7088afbd0..eb1e77e93f 100644
--- a/tests/utils/test_task_group.py
+++ b/tests/utils/test_task_group.py
@@ -1175,6 +1175,48 @@ def test_topological_nested_groups():
]
+def test_hierarchical_alphabetical_sort():
+ execution_date = pendulum.parse("20200101")
+ with DAG("test_dag_edges", start_date=execution_date) as dag:
+ task1 = EmptyOperator(task_id="task1")
+ task5 = EmptyOperator(task_id="task5")
+ with TaskGroup("group_c"):
+ task7 = EmptyOperator(task_id="task7")
+ with TaskGroup("group_b"):
+ task6 = EmptyOperator(task_id="task6")
+ with TaskGroup("group_a"):
+ with TaskGroup("group_d"):
+ task2 = EmptyOperator(task_id="task2")
+ task3 = EmptyOperator(task_id="task3")
+ task4 = EmptyOperator(task_id="task4")
+ task9 = EmptyOperator(task_id="task9")
+ task8 = EmptyOperator(task_id="task8")
+
+ def nested(group):
+ return [
+ nested(node) if isinstance(node, TaskGroup) else node
+ for node in group.hierarchical_alphabetical_sort()
+ ]
+
+ sorted_list = nested(dag.task_group)
+
+ assert sorted_list == [
+ [ # group_a
+ [ # group_d
+ task2,
+ task3,
+ task4,
+ ],
+ task8,
+ task9,
+ ],
+ [task6], # group_b
+ [task7], # group_c
+ task1,
+ task5,
+ ]
+
+
def test_topological_group_dep():
execution_date = pendulum.parse("20200101")
with DAG("test_dag_edges", start_date=execution_date) as dag: