Repository: incubator-airflow Updated Branches: refs/heads/master 16d195711 -> 55b56a433
[AIRFLOW-2602] Show failed attempts in Gantt view * Get failed attempts from `task_fail` table and show them in Gantt view. * Fix TaskFail class column definition to match actual database schema. Change was required to retrieve all records for one task instance run otherwise only one was returned by SQLAlchemy. Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/c989972d Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/c989972d Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/c989972d Branch: refs/heads/master Commit: c989972d2df8f9cf5a550e3cbf57ae12330ad437 Parents: 3162e1c Author: Stefan Seelmann <[email protected]> Authored: Tue Jun 12 23:09:31 2018 +0200 Committer: Stefan Seelmann <[email protected]> Committed: Thu Jun 21 07:26:29 2018 +0200 ---------------------------------------------------------------------- airflow/www/views.py | 18 +++++++++++++++--- airflow/www_rbac/views.py | 18 +++++++++++++++--- 2 files changed, 30 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c989972d/airflow/www/views.py ---------------------------------------------------------------------- diff --git a/airflow/www/views.py b/airflow/www/views.py index 8f6725e..c8baa5c 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -35,6 +35,7 @@ import bleach import pendulum import codecs from collections import defaultdict +import itertools import inspect from textwrap import dedent @@ -1816,10 +1817,21 @@ class Airflow(BaseView): ti for ti in dag.get_task_instances(session, dttm, dttm) if ti.start_date] tis = sorted(tis, key=lambda ti: ti.start_date) + TF = models.TaskFail + ti_fails = list(itertools.chain(*[( + session + .query(TF) + .filter(TF.dag_id == ti.dag_id, + TF.task_id == ti.task_id, + TF.execution_date == ti.execution_date) + .all() + ) for ti in tis])) + tis_with_fails = sorted(tis + ti_fails, key=lambda ti: ti.start_date) tasks = [] - for ti in tis: + for ti in tis_with_fails: end_date = ti.end_date if ti.end_date else timezone.utcnow() + state = ti.state if type(ti) == models.TaskInstance else State.FAILED tasks.append({ 'startDate': wwwutils.epoch(ti.start_date), 'endDate': wwwutils.epoch(end_date), @@ -1827,10 +1839,10 @@ class Airflow(BaseView): 'isoEnd': end_date.isoformat()[:-4], 'taskName': ti.task_id, 'duration': "{}".format(end_date - ti.start_date)[:-4], - 'status': ti.state, + 'status': state, 'executionDate': ti.execution_date.isoformat(), }) - states = {ti.state: ti.state for ti in tis} + states = {task['status']: task['status'] for task in tasks} data = { 'taskNames': [ti.task_id for ti in tis], 'tasks': tasks, http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c989972d/airflow/www_rbac/views.py ---------------------------------------------------------------------- diff --git a/airflow/www_rbac/views.py b/airflow/www_rbac/views.py index 43e481e..91bd177 100644 --- a/airflow/www_rbac/views.py +++ b/airflow/www_rbac/views.py @@ -32,6 +32,7 @@ import traceback import markdown import nvd3 import pendulum +import itertools import sqlalchemy as sqla from sqlalchemy import or_, desc, and_, union_all @@ -1496,10 +1497,21 @@ class Airflow(AirflowBaseView): ti for ti in dag.get_task_instances(session, dttm, dttm) if ti.start_date] tis = sorted(tis, key=lambda ti: ti.start_date) + TF = models.TaskFail + ti_fails = list(itertools.chain(*[( + session + .query(TF) + .filter(TF.dag_id == ti.dag_id, + TF.task_id == ti.task_id, + TF.execution_date == ti.execution_date) + .all() + ) for ti in tis])) + tis_with_fails = sorted(tis + ti_fails, key=lambda ti: ti.start_date) tasks = [] - for ti in tis: + for ti in tis_with_fails: end_date = ti.end_date if ti.end_date else timezone.utcnow() + state = ti.state if type(ti) == models.TaskInstance else State.FAILED tasks.append({ 'startDate': wwwutils.epoch(ti.start_date), 'endDate': wwwutils.epoch(end_date), @@ -1507,10 +1519,10 @@ class Airflow(AirflowBaseView): 'isoEnd': end_date.isoformat()[:-4], 'taskName': ti.task_id, 'duration': "{}".format(end_date - ti.start_date)[:-4], - 'status': ti.state, + 'status': state, 'executionDate': ti.execution_date.isoformat(), }) - states = {ti.state: ti.state for ti in tis} + states = {task['status']: task['status'] for task in tasks} data = { 'taskNames': [ti.task_id for ti in tis], 'tasks': tasks,
