Repository: incubator-airflow Updated Branches: refs/heads/master f516c9ee5 -> 0da5125ed
[AIRFLOW-1081] Improve performance of duration chart This commit reduces the number of queries to improve perf. Closes #2226 from saguziel/aguziel-duration-chart- fix Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/0da5125e Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/0da5125e Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/0da5125e Branch: refs/heads/master Commit: 0da5125edb03ea867add8c46de6705a5f4b542de Parents: f516c9e Author: Alex Guziel <[email protected]> Authored: Fri Apr 7 19:30:49 2017 +0200 Committer: Bolke de Bruin <[email protected]> Committed: Fri Apr 7 19:30:49 2017 +0200 ---------------------------------------------------------------------- airflow/www/views.py | 54 +++++++++++++++++++++++++++++------------------ 1 file changed, 33 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0da5125e/airflow/www/views.py ---------------------------------------------------------------------- diff --git a/airflow/www/views.py b/airflow/www/views.py index 0194e58..604ae66 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -25,6 +25,7 @@ import dateutil.parser import copy import json import bleach +from collections import defaultdict import inspect from textwrap import dedent @@ -1412,25 +1413,38 @@ class Airflow(BaseView): cum_chart = nvd3.lineChart( name="cumLineChart", x_is_date=True, height=600, width="1200") - y = {} - x = {} - cum_y = {} - for task in dag.tasks: - y[task.task_id] = [] - x[task.task_id] = [] - cum_y[task.task_id] = [] - for ti in task.get_task_instances(session, start_date=min_date, - end_date=base_date): - if ti.duration: - dttm = wwwutils.epoch(ti.execution_date) - x[ti.task_id].append(dttm) - y[ti.task_id].append(float(ti.duration)) - fails = session.query(models.TaskFail).filter_by( - task_id=ti.task_id, - dag_id=ti.dag_id, - execution_date=ti.execution_date).all() - fails_total = sum([f.duration for f in fails]) - cum_y[ti.task_id].append(float(ti.duration + fails_total)) + y = defaultdict(list) + x = defaultdict(list) + cum_y = defaultdict(list) + + tis = dag.get_task_instances( + session, start_date=min_date, end_date=base_date) + TF = models.TaskFail + ti_fails = ( + session + .query(TF) + .filter( + TF.dag_id == dag.dag_id, + TF.execution_date >= min_date, + TF.execution_date <= base_date, + TF.task_id.in_([t.task_id for t in dag.tasks])) + .all() + ) + + fails_totals = defaultdict(int) + for tf in ti_fails: + dict_key = (tf.dag_id, tf.task_id, tf.execution_date) + fails_totals[dict_key] += tf.duration + + for ti in tis: + if ti.duration: + dttm = wwwutils.epoch(ti.execution_date) + x[ti.task_id].append(dttm) + y[ti.task_id].append(float(ti.duration)) + fails_dict_key = (ti.dag_id, ti.task_id, ti.execution_date) + fails_total = fails_totals[fails_dict_key] + cum_y[ti.task_id].append(float(ti.duration + fails_total)) + # determine the most relevant time unit for the set of task instance # durations for the DAG y_unit = infer_time_unit([d for t in y.values() for d in t]) @@ -1448,8 +1462,6 @@ class Airflow(BaseView): y=scale_time_units(cum_y[task.task_id], cum_y_unit)) - tis = dag.get_task_instances( - session, start_date=min_date, end_date=base_date) dates = sorted(list({ti.execution_date for ti in tis})) max_date = max([ti.execution_date for ti in tis]) if dates else None
