Repository: incubator-airflow
Updated Branches:
  refs/heads/master 88d9b0dc9 -> ae44dddcb


[AIRFLOW-937] Improve performance of task_stats

Please accept this PR that addresses the following
issues:
-
https://issues.apache.org/jira/browse/AIRFLOW-937

Testing Done:
- Shouldn't change functionality significantly,
should pass existing tests (if they exist)

This leads to slightly different results, but it
reduced the time of this endpoint from 90s to 9s
on our data, and the existing logic for task_ids
was already incorrect (task_ids may not be
distinct across dags)

Closes #2121 from saguziel/task-stats-fix


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/ae44dddc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/ae44dddc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/ae44dddc

Branch: refs/heads/master
Commit: ae44dddcb6cdba6f4e4fa3fe56c3b87e0cd2e502
Parents: 88d9b0d
Author: Alex Guziel <[email protected]>
Authored: Thu Mar 2 14:04:49 2017 -0800
Committer: Dan Davydov <[email protected]>
Committed: Thu Mar 2 14:05:16 2017 -0800

----------------------------------------------------------------------
 airflow/www/views.py | 16 +++++-----------
 1 file changed, 5 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ae44dddc/airflow/www/views.py
----------------------------------------------------------------------
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 0e065a6..f7d4584 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -497,26 +497,24 @@ class Airflow(BaseView):
 
     @expose('/task_stats')
     def task_stats(self):
-        task_ids = []
-        dag_ids = []
-        for dag in dagbag.dags.values():
-            task_ids += dag.task_ids
-            if not dag.is_subdag:
-                dag_ids.append(dag.dag_id)
-
         TI = models.TaskInstance
         DagRun = models.DagRun
+        Dag = models.DagModel
         session = Session()
 
         LastDagRun = (
             session.query(DagRun.dag_id, 
sqla.func.max(DagRun.execution_date).label('execution_date'))
+            .join(Dag, Dag.dag_id == DagRun.dag_id)
             .filter(DagRun.state != State.RUNNING)
+            .filter(Dag.is_active == 1)
             .group_by(DagRun.dag_id)
             .subquery('last_dag_run')
         )
         RunningDagRun = (
             session.query(DagRun.dag_id, DagRun.execution_date)
+            .join(Dag, Dag.dag_id == DagRun.dag_id)
             .filter(DagRun.state == State.RUNNING)
+            .filter(Dag.is_active == 1)
             .subquery('running_dag_run')
         )
 
@@ -527,16 +525,12 @@ class Airflow(BaseView):
             .join(LastDagRun, and_(
                 LastDagRun.c.dag_id == TI.dag_id,
                 LastDagRun.c.execution_date == TI.execution_date))
-            .filter(TI.task_id.in_(task_ids))
-            .filter(TI.dag_id.in_(dag_ids))
         )
         RunningTI = (
             session.query(TI.dag_id.label('dag_id'), TI.state.label('state'))
             .join(RunningDagRun, and_(
                 RunningDagRun.c.dag_id == TI.dag_id,
                 RunningDagRun.c.execution_date == TI.execution_date))
-            .filter(TI.task_id.in_(task_ids))
-            .filter(TI.dag_id.in_(dag_ids))
         )
 
         UnionTI = union_all(LastTI, RunningTI).alias('union_ti')

Reply via email to