[AIRFLOW-937] Improve performance of task_stats

Please accept this PR that addresses the following
issues:
-
https://issues.apache.org/jira/browse/AIRFLOW-937

Testing Done:
- Shouldn't change functionality significantly,
should pass existing tests (if they exist)

This leads to slightly different results, but it
reduced the time of this endpoint from 90s to 9s
on our data, and the existing logic for task_ids
was already incorrect (task_ids may not be
distinct across dags)

Closes #2121 from saguziel/task-stats-fix


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/66f39ca0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/66f39ca0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/66f39ca0

Branch: refs/heads/v1-8-test
Commit: 66f39ca0c3511da2ff86858ce7ea569d11adbd44
Parents: 0964f18
Author: Alex Guziel <[email protected]>
Authored: Thu Mar 2 14:04:49 2017 -0800
Committer: Bolke de Bruin <[email protected]>
Committed: Sun Mar 12 08:21:13 2017 -0700

----------------------------------------------------------------------
 airflow/www/views.py | 16 +++++-----------
 1 file changed, 5 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/66f39ca0/airflow/www/views.py
----------------------------------------------------------------------
diff --git a/airflow/www/views.py b/airflow/www/views.py
index d8acfef..d1a1f9a 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -497,26 +497,24 @@ class Airflow(BaseView):
 
     @expose('/task_stats')
     def task_stats(self):
-        task_ids = []
-        dag_ids = []
-        for dag in dagbag.dags.values():
-            task_ids += dag.task_ids
-            if not dag.is_subdag:
-                dag_ids.append(dag.dag_id)
-
         TI = models.TaskInstance
         DagRun = models.DagRun
+        Dag = models.DagModel
         session = Session()
 
         LastDagRun = (
             session.query(DagRun.dag_id, 
sqla.func.max(DagRun.execution_date).label('execution_date'))
+            .join(Dag, Dag.dag_id == DagRun.dag_id)
             .filter(DagRun.state != State.RUNNING)
+            .filter(Dag.is_active == 1)
             .group_by(DagRun.dag_id)
             .subquery('last_dag_run')
         )
         RunningDagRun = (
             session.query(DagRun.dag_id, DagRun.execution_date)
+            .join(Dag, Dag.dag_id == DagRun.dag_id)
             .filter(DagRun.state == State.RUNNING)
+            .filter(Dag.is_active == 1)
             .subquery('running_dag_run')
         )
 
@@ -527,16 +525,12 @@ class Airflow(BaseView):
             .join(LastDagRun, and_(
                 LastDagRun.c.dag_id == TI.dag_id,
                 LastDagRun.c.execution_date == TI.execution_date))
-            .filter(TI.task_id.in_(task_ids))
-            .filter(TI.dag_id.in_(dag_ids))
         )
         RunningTI = (
             session.query(TI.dag_id.label('dag_id'), TI.state.label('state'))
             .join(RunningDagRun, and_(
                 RunningDagRun.c.dag_id == TI.dag_id,
                 RunningDagRun.c.execution_date == TI.execution_date))
-            .filter(TI.task_id.in_(task_ids))
-            .filter(TI.dag_id.in_(dag_ids))
         )
 
         UnionTI = union_all(LastTI, RunningTI).alias('union_ti')

Reply via email to