This is an automated email from the ASF dual-hosted git repository.

bbovenzi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 7ab5ea7853 Grid data: do not load all mapped instances (#23813)
7ab5ea7853 is described below

commit 7ab5ea7853df9d99f6da3ab804ffe085378fbd8a
Author: Brent Bovenzi <[email protected]>
AuthorDate: Fri May 20 00:18:17 2022 -0400

    Grid data: do not load all mapped instances (#23813)
    
    * only get necessary task instances
    
    * add comment
    
    * encode_ti -> get_task_summary
---
 airflow/www/utils.py | 19 +++++++++++++++----
 airflow/www/views.py | 34 ++++++++--------------------------
 2 files changed, 23 insertions(+), 30 deletions(-)

diff --git a/airflow/www/utils.py b/airflow/www/utils.py
index 05ad2a8ab1..2bdc2939bf 100644
--- a/airflow/www/utils.py
+++ b/airflow/www/utils.py
@@ -40,6 +40,7 @@ from sqlalchemy.orm import Session
 
 from airflow import models
 from airflow.models import errors
+from airflow.models.dagrun import DagRun
 from airflow.models.taskinstance import TaskInstance
 from airflow.utils import timezone
 from airflow.utils.code_utils import get_python_source
@@ -127,13 +128,23 @@ def get_mapped_summary(parent_instance, task_instances):
     }
 
 
-def encode_ti(
-    task_instance: Optional[TaskInstance], is_mapped: Optional[bool], session: 
Optional[Session]
-) -> Optional[Dict[str, Any]]:
+def get_task_summary(dag_run: DagRun, task, session: Session) -> 
Optional[Dict[str, Any]]:
+    task_instance = (
+        session.query(TaskInstance)
+        .filter(
+            TaskInstance.dag_id == task.dag_id,
+            TaskInstance.run_id == dag_run.run_id,
+            TaskInstance.task_id == task.task_id,
+            # Only get normal task instances or the first mapped task
+            TaskInstance.map_index <= 0,
+        )
+        .first()
+    )
+
     if not task_instance:
         return None
 
-    if is_mapped:
+    if task_instance.map_index > -1:
         return get_mapped_summary(task_instance, 
task_instances=get_mapped_instances(task_instance, session))
 
     try_count = (
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 19c93735c9..b66c38140e 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -250,7 +250,7 @@ def _safe_parse_datetime(v):
         abort(400, f"Invalid datetime: {v!r}")
 
 
-def task_group_to_grid(task_item_or_group, dag, dag_runs, tis, session):
+def task_group_to_grid(task_item_or_group, dag, dag_runs, session):
     """
     Create a nested dict representation of this TaskGroup and its children 
used to construct
     the Graph.
@@ -258,11 +258,7 @@ def task_group_to_grid(task_item_or_group, dag, dag_runs, 
tis, session):
     if isinstance(task_item_or_group, AbstractOperator):
         return {
             'id': task_item_or_group.task_id,
-            'instances': [
-                wwwutils.encode_ti(ti, task_item_or_group.is_mapped, session)
-                for ti in tis
-                if ti.task_id == task_item_or_group.task_id
-            ],
+            'instances': [wwwutils.get_task_summary(dr, task_item_or_group, 
session) for dr in dag_runs],
             'label': task_item_or_group.label,
             'extra_links': task_item_or_group.extra_links,
             'is_mapped': task_item_or_group.is_mapped,
@@ -271,21 +267,15 @@ def task_group_to_grid(task_item_or_group, dag, dag_runs, 
tis, session):
     # Task Group
     task_group = task_item_or_group
 
-    children = [
-        task_group_to_grid(child, dag, dag_runs, tis, session) for child in 
task_group.topological_sort()
-    ]
+    children = [task_group_to_grid(child, dag, dag_runs, session) for child in 
task_group.topological_sort()]
 
     def get_summary(dag_run, children):
         child_instances = [child['instances'] for child in children if 
'instances' in child]
         child_instances = [item for sublist in child_instances for item in 
sublist]
 
-        children_start_dates = [
-            item['start_date'] for item in child_instances if item['run_id'] 
== dag_run.run_id
-        ]
-        children_end_dates = [
-            item['end_date'] for item in child_instances if item['run_id'] == 
dag_run.run_id
-        ]
-        children_states = [item['state'] for item in child_instances if 
item['run_id'] == dag_run.run_id]
+        children_start_dates = [item['start_date'] for item in child_instances 
if item]
+        children_end_dates = [item['end_date'] for item in child_instances if 
item]
+        children_states = [item['state'] for item in child_instances if item]
 
         group_state = None
         for state in wwwutils.priority:
@@ -2642,12 +2632,8 @@ class Airflow(AirflowBaseView):
         else:
             external_log_name = None
 
-        min_date = min(dag_run_dates, default=None)
-
-        tis = dag.get_task_instances(start_date=min_date, end_date=base_date, 
session=session)
-
         data = {
-            'groups': task_group_to_grid(dag.task_group, dag, dag_runs, tis, 
session),
+            'groups': task_group_to_grid(dag.task_group, dag, dag_runs, 
session),
             'dag_runs': encoded_runs,
         }
 
@@ -2675,7 +2661,6 @@ class Airflow(AirflowBaseView):
             dag_model=dag_model,
             auto_refresh_interval=conf.getint('webserver', 
'auto_refresh_interval'),
             default_dag_run_display_number=default_dag_run_display_number,
-            task_instances=tis,
             filters_drop_down_values=htmlsafe_json_dumps(
                 {
                     "taskStates": [state.value for state in TaskInstanceState],
@@ -3542,11 +3527,8 @@ class Airflow(AirflowBaseView):
             dag_runs = 
query.order_by(DagRun.execution_date.desc()).limit(num_runs).all()
             dag_runs.reverse()
             encoded_runs = [wwwutils.encode_dag_run(dr) for dr in dag_runs]
-            dag_run_dates = {dr.execution_date: alchemy_to_dict(dr) for dr in 
dag_runs}
-            min_date = min(dag_run_dates, default=None)
-            tis = dag.get_task_instances(start_date=min_date, 
end_date=base_date, session=session)
             data = {
-                'groups': task_group_to_grid(dag.task_group, dag, dag_runs, 
tis, session),
+                'groups': task_group_to_grid(dag.task_group, dag, dag_runs, 
session),
                 'dag_runs': encoded_runs,
             }
 

Reply via email to