This is an automated email from the ASF dual-hosted git repository.
jedcunningham pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 1cf483fa0c Fix and speed up grid view (#23947)
1cf483fa0c is described below
commit 1cf483fa0c45e0110d99e37b4e45c72c6084aa97
Author: Jed Cunningham <[email protected]>
AuthorDate: Thu May 26 13:53:22 2022 -0600
Fix and speed up grid view (#23947)
This fetches all TIs for a given task across dag runs, leading to
signifincatly faster response times. It also fixes a bug where Nones
were being passed to the UI when a new task was added to a DAG with
exiting runs.
---
airflow/www/utils.py | 57 ++++++++++++++++++++++++++--------------------------
airflow/www/views.py | 2 +-
2 files changed, 29 insertions(+), 30 deletions(-)
diff --git a/airflow/www/utils.py b/airflow/www/utils.py
index 2bdc2939bf..0a7de05abf 100644
--- a/airflow/www/utils.py
+++ b/airflow/www/utils.py
@@ -128,39 +128,38 @@ def get_mapped_summary(parent_instance, task_instances):
}
-def get_task_summary(dag_run: DagRun, task, session: Session) ->
Optional[Dict[str, Any]]:
- task_instance = (
- session.query(TaskInstance)
- .filter(
- TaskInstance.dag_id == task.dag_id,
- TaskInstance.run_id == dag_run.run_id,
- TaskInstance.task_id == task.task_id,
- # Only get normal task instances or the first mapped task
- TaskInstance.map_index <= 0,
- )
- .first()
+def get_task_summaries(task, dag_runs: List[DagRun], session: Session) ->
List[Dict[str, Any]]:
+ tis = session.query(TaskInstance).filter(
+ TaskInstance.dag_id == task.dag_id,
+ TaskInstance.run_id.in_([dag_run.run_id for dag_run in dag_runs]),
+ TaskInstance.task_id == task.task_id,
+ # Only get normal task instances or the first mapped task
+ TaskInstance.map_index <= 0,
)
- if not task_instance:
- return None
+ def _get_summary(task_instance):
+ if task_instance.map_index > -1:
+ return get_mapped_summary(
+ task_instance,
task_instances=get_mapped_instances(task_instance, session)
+ )
- if task_instance.map_index > -1:
- return get_mapped_summary(task_instance,
task_instances=get_mapped_instances(task_instance, session))
+ try_count = (
+ task_instance.prev_attempted_tries
+ if task_instance.prev_attempted_tries != 0
+ else task_instance.try_number
+ )
- try_count = (
- task_instance.prev_attempted_tries
- if task_instance.prev_attempted_tries != 0
- else task_instance.try_number
- )
- return {
- 'task_id': task_instance.task_id,
- 'run_id': task_instance.run_id,
- 'map_index': task_instance.map_index,
- 'state': task_instance.state,
- 'start_date': datetime_to_string(task_instance.start_date),
- 'end_date': datetime_to_string(task_instance.end_date),
- 'try_number': try_count,
- }
+ return {
+ 'task_id': task_instance.task_id,
+ 'run_id': task_instance.run_id,
+ 'map_index': task_instance.map_index,
+ 'state': task_instance.state,
+ 'start_date': datetime_to_string(task_instance.start_date),
+ 'end_date': datetime_to_string(task_instance.end_date),
+ 'try_number': try_count,
+ }
+
+ return [_get_summary(ti) for ti in tis]
def encode_dag_run(dag_run: Optional[models.DagRun]) -> Optional[Dict[str,
Any]]:
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 4c3d33a6b1..29e15bfe16 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -258,7 +258,7 @@ def task_group_to_grid(task_item_or_group, dag, dag_runs,
session):
if isinstance(task_item_or_group, AbstractOperator):
return {
'id': task_item_or_group.task_id,
- 'instances': [wwwutils.get_task_summary(dr, task_item_or_group,
session) for dr in dag_runs],
+ 'instances': wwwutils.get_task_summaries(task_item_or_group,
dag_runs, session),
'label': task_item_or_group.label,
'extra_links': task_item_or_group.extra_links,
'is_mapped': task_item_or_group.is_mapped,