This is an automated email from the ASF dual-hosted git repository.
bbovenzi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 7ab5ea7853 Grid data: do not load all mapped instances (#23813)
7ab5ea7853 is described below
commit 7ab5ea7853df9d99f6da3ab804ffe085378fbd8a
Author: Brent Bovenzi <[email protected]>
AuthorDate: Fri May 20 00:18:17 2022 -0400
Grid data: do not load all mapped instances (#23813)
* only get necessary task instances
* add comment
* encode_ti -> get_task_summary
---
airflow/www/utils.py | 19 +++++++++++++++----
airflow/www/views.py | 34 ++++++++--------------------------
2 files changed, 23 insertions(+), 30 deletions(-)
diff --git a/airflow/www/utils.py b/airflow/www/utils.py
index 05ad2a8ab1..2bdc2939bf 100644
--- a/airflow/www/utils.py
+++ b/airflow/www/utils.py
@@ -40,6 +40,7 @@ from sqlalchemy.orm import Session
from airflow import models
from airflow.models import errors
+from airflow.models.dagrun import DagRun
from airflow.models.taskinstance import TaskInstance
from airflow.utils import timezone
from airflow.utils.code_utils import get_python_source
@@ -127,13 +128,23 @@ def get_mapped_summary(parent_instance, task_instances):
}
-def encode_ti(
- task_instance: Optional[TaskInstance], is_mapped: Optional[bool], session:
Optional[Session]
-) -> Optional[Dict[str, Any]]:
+def get_task_summary(dag_run: DagRun, task, session: Session) ->
Optional[Dict[str, Any]]:
+ task_instance = (
+ session.query(TaskInstance)
+ .filter(
+ TaskInstance.dag_id == task.dag_id,
+ TaskInstance.run_id == dag_run.run_id,
+ TaskInstance.task_id == task.task_id,
+ # Only get normal task instances or the first mapped task
+ TaskInstance.map_index <= 0,
+ )
+ .first()
+ )
+
if not task_instance:
return None
- if is_mapped:
+ if task_instance.map_index > -1:
return get_mapped_summary(task_instance,
task_instances=get_mapped_instances(task_instance, session))
try_count = (
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 19c93735c9..b66c38140e 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -250,7 +250,7 @@ def _safe_parse_datetime(v):
abort(400, f"Invalid datetime: {v!r}")
-def task_group_to_grid(task_item_or_group, dag, dag_runs, tis, session):
+def task_group_to_grid(task_item_or_group, dag, dag_runs, session):
"""
Create a nested dict representation of this TaskGroup and its children
used to construct
the Graph.
@@ -258,11 +258,7 @@ def task_group_to_grid(task_item_or_group, dag, dag_runs,
tis, session):
if isinstance(task_item_or_group, AbstractOperator):
return {
'id': task_item_or_group.task_id,
- 'instances': [
- wwwutils.encode_ti(ti, task_item_or_group.is_mapped, session)
- for ti in tis
- if ti.task_id == task_item_or_group.task_id
- ],
+ 'instances': [wwwutils.get_task_summary(dr, task_item_or_group,
session) for dr in dag_runs],
'label': task_item_or_group.label,
'extra_links': task_item_or_group.extra_links,
'is_mapped': task_item_or_group.is_mapped,
@@ -271,21 +267,15 @@ def task_group_to_grid(task_item_or_group, dag, dag_runs,
tis, session):
# Task Group
task_group = task_item_or_group
- children = [
- task_group_to_grid(child, dag, dag_runs, tis, session) for child in
task_group.topological_sort()
- ]
+ children = [task_group_to_grid(child, dag, dag_runs, session) for child in
task_group.topological_sort()]
def get_summary(dag_run, children):
child_instances = [child['instances'] for child in children if
'instances' in child]
child_instances = [item for sublist in child_instances for item in
sublist]
- children_start_dates = [
- item['start_date'] for item in child_instances if item['run_id']
== dag_run.run_id
- ]
- children_end_dates = [
- item['end_date'] for item in child_instances if item['run_id'] ==
dag_run.run_id
- ]
- children_states = [item['state'] for item in child_instances if
item['run_id'] == dag_run.run_id]
+ children_start_dates = [item['start_date'] for item in child_instances
if item]
+ children_end_dates = [item['end_date'] for item in child_instances if
item]
+ children_states = [item['state'] for item in child_instances if item]
group_state = None
for state in wwwutils.priority:
@@ -2642,12 +2632,8 @@ class Airflow(AirflowBaseView):
else:
external_log_name = None
- min_date = min(dag_run_dates, default=None)
-
- tis = dag.get_task_instances(start_date=min_date, end_date=base_date,
session=session)
-
data = {
- 'groups': task_group_to_grid(dag.task_group, dag, dag_runs, tis,
session),
+ 'groups': task_group_to_grid(dag.task_group, dag, dag_runs,
session),
'dag_runs': encoded_runs,
}
@@ -2675,7 +2661,6 @@ class Airflow(AirflowBaseView):
dag_model=dag_model,
auto_refresh_interval=conf.getint('webserver',
'auto_refresh_interval'),
default_dag_run_display_number=default_dag_run_display_number,
- task_instances=tis,
filters_drop_down_values=htmlsafe_json_dumps(
{
"taskStates": [state.value for state in TaskInstanceState],
@@ -3542,11 +3527,8 @@ class Airflow(AirflowBaseView):
dag_runs =
query.order_by(DagRun.execution_date.desc()).limit(num_runs).all()
dag_runs.reverse()
encoded_runs = [wwwutils.encode_dag_run(dr) for dr in dag_runs]
- dag_run_dates = {dr.execution_date: alchemy_to_dict(dr) for dr in
dag_runs}
- min_date = min(dag_run_dates, default=None)
- tis = dag.get_task_instances(start_date=min_date,
end_date=base_date, session=session)
data = {
- 'groups': task_group_to_grid(dag.task_group, dag, dag_runs,
tis, session),
+ 'groups': task_group_to_grid(dag.task_group, dag, dag_runs,
session),
'dag_runs': encoded_runs,
}