Repository: incubator-airflow Updated Branches: refs/heads/master 0923356a5 -> a2ed55f2c
[AIRFLOW-246] Improve dag_stats endpoint query For now, accessing /dag_stats can take a relatively long time (e.g. over 20 seconds with less than a million rows on some environment). This patch replaces multiple LEFT OUTER JOINs with INNER JOINs and UNION ALL and improves that process by making it 3-5x faster. Closes #1610 from sekikn/AIRFLOW-246 Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/a2ed55f2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/a2ed55f2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/a2ed55f2 Branch: refs/heads/master Commit: a2ed55f2c40857efaaf1876de5bc7929cbfb3166 Parents: 0923356 Author: Kengo Seki <[email protected]> Authored: Wed Jun 29 22:19:26 2016 +0200 Committer: Bolke de Bruin <[email protected]> Committed: Wed Jun 29 22:19:26 2016 +0200 ---------------------------------------------------------------------- airflow/www/views.py | 34 +++++++++++++++++++--------------- 1 file changed, 19 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a2ed55f2/airflow/www/views.py ---------------------------------------------------------------------- diff --git a/airflow/www/views.py b/airflow/www/views.py index 5c18e81..1d0fdd7 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -31,7 +31,7 @@ import inspect import traceback import sqlalchemy as sqla -from sqlalchemy import or_, desc, and_ +from sqlalchemy import or_, desc, and_, union_all from flask import ( redirect, url_for, request, Markup, Response, current_app, render_template) @@ -497,23 +497,27 @@ class Airflow(BaseView): # Select all task_instances from active dag_runs. # If no dag_run is active, return task instances from most recent dag_run. - qry = ( - session.query(TI.dag_id, TI.state, sqla.func.count(TI.task_id)) - .outerjoin(RunningDagRun, and_( - RunningDagRun.c.dag_id == TI.dag_id, - RunningDagRun.c.execution_date == TI.execution_date) - ) - .outerjoin(LastDagRun, and_( + LastTI = ( + session.query(TI.dag_id.label('dag_id'), TI.state.label('state')) + .join(LastDagRun, and_( LastDagRun.c.dag_id == TI.dag_id, - LastDagRun.c.execution_date == TI.execution_date) - ) + LastDagRun.c.execution_date == TI.execution_date)) .filter(TI.task_id.in_(task_ids)) .filter(TI.dag_id.in_(dag_ids)) - .filter(or_( - RunningDagRun.c.dag_id != None, - LastDagRun.c.dag_id != None - )) - .group_by(TI.dag_id, TI.state) + ) + RunningTI = ( + session.query(TI.dag_id.label('dag_id'), TI.state.label('state')) + .join(RunningDagRun, and_( + RunningDagRun.c.dag_id == TI.dag_id, + RunningDagRun.c.execution_date == TI.execution_date)) + .filter(TI.task_id.in_(task_ids)) + .filter(TI.dag_id.in_(dag_ids)) + ) + + UnionTI = union_all(LastTI, RunningTI).alias('union_ti') + qry = ( + session.query(UnionTI.c.dag_id, UnionTI.c.state, sqla.func.count()) + .group_by(UnionTI.c.dag_id, UnionTI.c.state) ) data = {}
