Repository: incubator-airflow
Updated Branches:
  refs/heads/master cadfae54b -> 3d6095ff5


[AIRFLOW-989] Do not mark dag run successful if unfinished tasks

Dag runs could be marked successful if all root
tasks were successful,
even if some tasks did not run yet, ie. in case of
clearing. Now
we consider unfinished_tasks, before marking
successful.

Closes #2154 from bolkedebruin/AIRFLOW-989


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/3d6095ff
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/3d6095ff
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/3d6095ff

Branch: refs/heads/master
Commit: 3d6095ff5cf6eff0444d7e47a2360765f2953daf
Parents: cadfae5
Author: Bolke de Bruin <[email protected]>
Authored: Wed Mar 15 16:39:12 2017 -0700
Committer: Bolke de Bruin <[email protected]>
Committed: Wed Mar 15 16:39:12 2017 -0700

----------------------------------------------------------------------
 airflow/models.py |  6 +++---
 tests/models.py   | 51 ++++++++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 54 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3d6095ff/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index 27a5670..ad3346a 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -4091,9 +4091,9 @@ class DagRun(Base):
                 logging.info('Marking run {} failed'.format(self))
                 self.state = State.FAILED
 
-            # if all roots succeeded, the run succeeded
-            elif all(r.state in (State.SUCCESS, State.SKIPPED)
-                     for r in roots):
+            # if all roots succeeded and no unfinished tasks, the run succeeded
+            elif not unfinished_tasks and all(r.state in (State.SUCCESS, 
State.SKIPPED)
+                                              for r in roots):
                 logging.info('Marking run {} successful'.format(self))
                 self.state = State.SUCCESS
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3d6095ff/tests/models.py
----------------------------------------------------------------------
diff --git a/tests/models.py b/tests/models.py
index 6fbbf3e..8ce08eb 100644
--- a/tests/models.py
+++ b/tests/models.py
@@ -259,6 +259,57 @@ class DagRunTest(unittest.TestCase):
         updated_dag_state = dag_run.update_state()
         self.assertEqual(State.SUCCESS, updated_dag_state)
 
+    def test_dagrun_success_conditions(self):
+        session = settings.Session()
+
+        dag = DAG(
+            'test_dagrun_success_conditions',
+            start_date=DEFAULT_DATE,
+            default_args={'owner': 'owner1'})
+
+        # A -> B
+        # A -> C -> D
+        # ordered: B, D, C, A or D, B, C, A or D, C, B, A
+        with dag:
+            op1 = DummyOperator(task_id='A')
+            op2 = DummyOperator(task_id='B')
+            op3 = DummyOperator(task_id='C')
+            op4 = DummyOperator(task_id='D')
+            op1.set_upstream([op2, op3])
+            op3.set_upstream(op4)
+
+        dag.clear()
+
+        now = datetime.datetime.now()
+        dr = dag.create_dagrun(run_id='test_dagrun_success_conditions',
+                               state=State.RUNNING,
+                               execution_date=now,
+                               start_date=now)
+
+        # op1 = root
+        ti_op1 = dr.get_task_instance(task_id=op1.task_id)
+        ti_op1.set_state(state=State.SUCCESS, session=session)
+
+        ti_op2 = dr.get_task_instance(task_id=op2.task_id)
+        ti_op3 = dr.get_task_instance(task_id=op3.task_id)
+        ti_op4 = dr.get_task_instance(task_id=op4.task_id)
+
+        # root is successful, but unfinished tasks
+        state = dr.update_state()
+        self.assertEqual(State.RUNNING, state)
+
+        # one has failed, but root is successful
+        ti_op2.set_state(state=State.FAILED, session=session)
+        ti_op3.set_state(state=State.SUCCESS, session=session)
+        ti_op4.set_state(state=State.SUCCESS, session=session)
+        state = dr.update_state()
+        self.assertEqual(State.SUCCESS, state)
+
+        # upstream dependency failed, root has not run
+        ti_op1.set_state(State.NONE, session)
+        state = dr.update_state()
+        self.assertEqual(State.FAILED, state)
+
 
 class DagBagTest(unittest.TestCase):
 

Reply via email to