Repository: incubator-airflow Updated Branches: refs/heads/master cadfae54b -> 3d6095ff5
[AIRFLOW-989] Do not mark dag run successful if unfinished tasks Dag runs could be marked successful if all root tasks were successful, even if some tasks did not run yet, ie. in case of clearing. Now we consider unfinished_tasks, before marking successful. Closes #2154 from bolkedebruin/AIRFLOW-989 Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/3d6095ff Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/3d6095ff Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/3d6095ff Branch: refs/heads/master Commit: 3d6095ff5cf6eff0444d7e47a2360765f2953daf Parents: cadfae5 Author: Bolke de Bruin <[email protected]> Authored: Wed Mar 15 16:39:12 2017 -0700 Committer: Bolke de Bruin <[email protected]> Committed: Wed Mar 15 16:39:12 2017 -0700 ---------------------------------------------------------------------- airflow/models.py | 6 +++--- tests/models.py | 51 ++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 54 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3d6095ff/airflow/models.py ---------------------------------------------------------------------- diff --git a/airflow/models.py b/airflow/models.py index 27a5670..ad3346a 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -4091,9 +4091,9 @@ class DagRun(Base): logging.info('Marking run {} failed'.format(self)) self.state = State.FAILED - # if all roots succeeded, the run succeeded - elif all(r.state in (State.SUCCESS, State.SKIPPED) - for r in roots): + # if all roots succeeded and no unfinished tasks, the run succeeded + elif not unfinished_tasks and all(r.state in (State.SUCCESS, State.SKIPPED) + for r in roots): logging.info('Marking run {} successful'.format(self)) self.state = State.SUCCESS http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3d6095ff/tests/models.py ---------------------------------------------------------------------- diff --git a/tests/models.py b/tests/models.py index 6fbbf3e..8ce08eb 100644 --- a/tests/models.py +++ b/tests/models.py @@ -259,6 +259,57 @@ class DagRunTest(unittest.TestCase): updated_dag_state = dag_run.update_state() self.assertEqual(State.SUCCESS, updated_dag_state) + def test_dagrun_success_conditions(self): + session = settings.Session() + + dag = DAG( + 'test_dagrun_success_conditions', + start_date=DEFAULT_DATE, + default_args={'owner': 'owner1'}) + + # A -> B + # A -> C -> D + # ordered: B, D, C, A or D, B, C, A or D, C, B, A + with dag: + op1 = DummyOperator(task_id='A') + op2 = DummyOperator(task_id='B') + op3 = DummyOperator(task_id='C') + op4 = DummyOperator(task_id='D') + op1.set_upstream([op2, op3]) + op3.set_upstream(op4) + + dag.clear() + + now = datetime.datetime.now() + dr = dag.create_dagrun(run_id='test_dagrun_success_conditions', + state=State.RUNNING, + execution_date=now, + start_date=now) + + # op1 = root + ti_op1 = dr.get_task_instance(task_id=op1.task_id) + ti_op1.set_state(state=State.SUCCESS, session=session) + + ti_op2 = dr.get_task_instance(task_id=op2.task_id) + ti_op3 = dr.get_task_instance(task_id=op3.task_id) + ti_op4 = dr.get_task_instance(task_id=op4.task_id) + + # root is successful, but unfinished tasks + state = dr.update_state() + self.assertEqual(State.RUNNING, state) + + # one has failed, but root is successful + ti_op2.set_state(state=State.FAILED, session=session) + ti_op3.set_state(state=State.SUCCESS, session=session) + ti_op4.set_state(state=State.SUCCESS, session=session) + state = dr.update_state() + self.assertEqual(State.SUCCESS, state) + + # upstream dependency failed, root has not run + ti_op1.set_state(State.NONE, session) + state = dr.update_state() + self.assertEqual(State.FAILED, state) + class DagBagTest(unittest.TestCase):
