[AIRFLOW-937] Improve performance of task_stats Please accept this PR that addresses the following issues: - https://issues.apache.org/jira/browse/AIRFLOW-937
Testing Done: - Shouldn't change functionality significantly, should pass existing tests (if they exist) This leads to slightly different results, but it reduced the time of this endpoint from 90s to 9s on our data, and the existing logic for task_ids was already incorrect (task_ids may not be distinct across dags) Closes #2121 from saguziel/task-stats-fix Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/66f39ca0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/66f39ca0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/66f39ca0 Branch: refs/heads/v1-8-test Commit: 66f39ca0c3511da2ff86858ce7ea569d11adbd44 Parents: 0964f18 Author: Alex Guziel <[email protected]> Authored: Thu Mar 2 14:04:49 2017 -0800 Committer: Bolke de Bruin <[email protected]> Committed: Sun Mar 12 08:21:13 2017 -0700 ---------------------------------------------------------------------- airflow/www/views.py | 16 +++++----------- 1 file changed, 5 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/66f39ca0/airflow/www/views.py ---------------------------------------------------------------------- diff --git a/airflow/www/views.py b/airflow/www/views.py index d8acfef..d1a1f9a 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -497,26 +497,24 @@ class Airflow(BaseView): @expose('/task_stats') def task_stats(self): - task_ids = [] - dag_ids = [] - for dag in dagbag.dags.values(): - task_ids += dag.task_ids - if not dag.is_subdag: - dag_ids.append(dag.dag_id) - TI = models.TaskInstance DagRun = models.DagRun + Dag = models.DagModel session = Session() LastDagRun = ( session.query(DagRun.dag_id, sqla.func.max(DagRun.execution_date).label('execution_date')) + .join(Dag, Dag.dag_id == DagRun.dag_id) .filter(DagRun.state != State.RUNNING) + .filter(Dag.is_active == 1) .group_by(DagRun.dag_id) .subquery('last_dag_run') ) RunningDagRun = ( session.query(DagRun.dag_id, DagRun.execution_date) + .join(Dag, Dag.dag_id == DagRun.dag_id) .filter(DagRun.state == State.RUNNING) + .filter(Dag.is_active == 1) .subquery('running_dag_run') ) @@ -527,16 +525,12 @@ class Airflow(BaseView): .join(LastDagRun, and_( LastDagRun.c.dag_id == TI.dag_id, LastDagRun.c.execution_date == TI.execution_date)) - .filter(TI.task_id.in_(task_ids)) - .filter(TI.dag_id.in_(dag_ids)) ) RunningTI = ( session.query(TI.dag_id.label('dag_id'), TI.state.label('state')) .join(RunningDagRun, and_( RunningDagRun.c.dag_id == TI.dag_id, RunningDagRun.c.execution_date == TI.execution_date)) - .filter(TI.task_id.in_(task_ids)) - .filter(TI.dag_id.in_(dag_ids)) ) UnionTI = union_all(LastTI, RunningTI).alias('union_ti')
