Repository: incubator-airflow
Updated Branches:
  refs/heads/master 16d195711 -> 55b56a433


[AIRFLOW-2602] Show failed attempts in Gantt view

* Get failed attempts from `task_fail` table and show them in Gantt view.
* Fix TaskFail class column definition to match actual database schema.
  Change was required to retrieve all records for one task instance run
  otherwise only one was returned by SQLAlchemy.


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/c989972d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/c989972d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/c989972d

Branch: refs/heads/master
Commit: c989972d2df8f9cf5a550e3cbf57ae12330ad437
Parents: 3162e1c
Author: Stefan Seelmann <[email protected]>
Authored: Tue Jun 12 23:09:31 2018 +0200
Committer: Stefan Seelmann <[email protected]>
Committed: Thu Jun 21 07:26:29 2018 +0200

----------------------------------------------------------------------
 airflow/www/views.py      | 18 +++++++++++++++---
 airflow/www_rbac/views.py | 18 +++++++++++++++---
 2 files changed, 30 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c989972d/airflow/www/views.py
----------------------------------------------------------------------
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 8f6725e..c8baa5c 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -35,6 +35,7 @@ import bleach
 import pendulum
 import codecs
 from collections import defaultdict
+import itertools
 
 import inspect
 from textwrap import dedent
@@ -1816,10 +1817,21 @@ class Airflow(BaseView):
             ti for ti in dag.get_task_instances(session, dttm, dttm)
             if ti.start_date]
         tis = sorted(tis, key=lambda ti: ti.start_date)
+        TF = models.TaskFail
+        ti_fails = list(itertools.chain(*[(
+            session
+            .query(TF)
+            .filter(TF.dag_id == ti.dag_id,
+                    TF.task_id == ti.task_id,
+                    TF.execution_date == ti.execution_date)
+            .all()
+        ) for ti in tis]))
+        tis_with_fails = sorted(tis + ti_fails, key=lambda ti: ti.start_date)
 
         tasks = []
-        for ti in tis:
+        for ti in tis_with_fails:
             end_date = ti.end_date if ti.end_date else timezone.utcnow()
+            state = ti.state if type(ti) == models.TaskInstance else 
State.FAILED
             tasks.append({
                 'startDate': wwwutils.epoch(ti.start_date),
                 'endDate': wwwutils.epoch(end_date),
@@ -1827,10 +1839,10 @@ class Airflow(BaseView):
                 'isoEnd': end_date.isoformat()[:-4],
                 'taskName': ti.task_id,
                 'duration': "{}".format(end_date - ti.start_date)[:-4],
-                'status': ti.state,
+                'status': state,
                 'executionDate': ti.execution_date.isoformat(),
             })
-        states = {ti.state: ti.state for ti in tis}
+        states = {task['status']: task['status'] for task in tasks}
         data = {
             'taskNames': [ti.task_id for ti in tis],
             'tasks': tasks,

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c989972d/airflow/www_rbac/views.py
----------------------------------------------------------------------
diff --git a/airflow/www_rbac/views.py b/airflow/www_rbac/views.py
index 43e481e..91bd177 100644
--- a/airflow/www_rbac/views.py
+++ b/airflow/www_rbac/views.py
@@ -32,6 +32,7 @@ import traceback
 import markdown
 import nvd3
 import pendulum
+import itertools
 
 import sqlalchemy as sqla
 from sqlalchemy import or_, desc, and_, union_all
@@ -1496,10 +1497,21 @@ class Airflow(AirflowBaseView):
             ti for ti in dag.get_task_instances(session, dttm, dttm)
             if ti.start_date]
         tis = sorted(tis, key=lambda ti: ti.start_date)
+        TF = models.TaskFail
+        ti_fails = list(itertools.chain(*[(
+            session
+            .query(TF)
+            .filter(TF.dag_id == ti.dag_id,
+                    TF.task_id == ti.task_id,
+                    TF.execution_date == ti.execution_date)
+            .all()
+        ) for ti in tis]))
+        tis_with_fails = sorted(tis + ti_fails, key=lambda ti: ti.start_date)
 
         tasks = []
-        for ti in tis:
+        for ti in tis_with_fails:
             end_date = ti.end_date if ti.end_date else timezone.utcnow()
+            state = ti.state if type(ti) == models.TaskInstance else 
State.FAILED
             tasks.append({
                 'startDate': wwwutils.epoch(ti.start_date),
                 'endDate': wwwutils.epoch(end_date),
@@ -1507,10 +1519,10 @@ class Airflow(AirflowBaseView):
                 'isoEnd': end_date.isoformat()[:-4],
                 'taskName': ti.task_id,
                 'duration': "{}".format(end_date - ti.start_date)[:-4],
-                'status': ti.state,
+                'status': state,
                 'executionDate': ti.execution_date.isoformat(),
             })
-        states = {ti.state: ti.state for ti in tis}
+        states = {task['status']: task['status'] for task in tasks}
         data = {
             'taskNames': [ti.task_id for ti in tis],
             'tasks': tasks,

Reply via email to