Repository: incubator-airflow Updated Branches: refs/heads/master 88d9b0dc9 -> ae44dddcb
[AIRFLOW-937] Improve performance of task_stats Please accept this PR that addresses the following issues: - https://issues.apache.org/jira/browse/AIRFLOW-937 Testing Done: - Shouldn't change functionality significantly, should pass existing tests (if they exist) This leads to slightly different results, but it reduced the time of this endpoint from 90s to 9s on our data, and the existing logic for task_ids was already incorrect (task_ids may not be distinct across dags) Closes #2121 from saguziel/task-stats-fix Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/ae44dddc Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/ae44dddc Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/ae44dddc Branch: refs/heads/master Commit: ae44dddcb6cdba6f4e4fa3fe56c3b87e0cd2e502 Parents: 88d9b0d Author: Alex Guziel <[email protected]> Authored: Thu Mar 2 14:04:49 2017 -0800 Committer: Dan Davydov <[email protected]> Committed: Thu Mar 2 14:05:16 2017 -0800 ---------------------------------------------------------------------- airflow/www/views.py | 16 +++++----------- 1 file changed, 5 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ae44dddc/airflow/www/views.py ---------------------------------------------------------------------- diff --git a/airflow/www/views.py b/airflow/www/views.py index 0e065a6..f7d4584 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -497,26 +497,24 @@ class Airflow(BaseView): @expose('/task_stats') def task_stats(self): - task_ids = [] - dag_ids = [] - for dag in dagbag.dags.values(): - task_ids += dag.task_ids - if not dag.is_subdag: - dag_ids.append(dag.dag_id) - TI = models.TaskInstance DagRun = models.DagRun + Dag = models.DagModel session = Session() LastDagRun = ( session.query(DagRun.dag_id, sqla.func.max(DagRun.execution_date).label('execution_date')) + .join(Dag, Dag.dag_id == DagRun.dag_id) .filter(DagRun.state != State.RUNNING) + .filter(Dag.is_active == 1) .group_by(DagRun.dag_id) .subquery('last_dag_run') ) RunningDagRun = ( session.query(DagRun.dag_id, DagRun.execution_date) + .join(Dag, Dag.dag_id == DagRun.dag_id) .filter(DagRun.state == State.RUNNING) + .filter(Dag.is_active == 1) .subquery('running_dag_run') ) @@ -527,16 +525,12 @@ class Airflow(BaseView): .join(LastDagRun, and_( LastDagRun.c.dag_id == TI.dag_id, LastDagRun.c.execution_date == TI.execution_date)) - .filter(TI.task_id.in_(task_ids)) - .filter(TI.dag_id.in_(dag_ids)) ) RunningTI = ( session.query(TI.dag_id.label('dag_id'), TI.state.label('state')) .join(RunningDagRun, and_( RunningDagRun.c.dag_id == TI.dag_id, RunningDagRun.c.execution_date == TI.execution_date)) - .filter(TI.task_id.in_(task_ids)) - .filter(TI.dag_id.in_(dag_ids)) ) UnionTI = union_all(LastTI, RunningTI).alias('union_ti')
