[AIRFLOW-897] Prevent dagruns from failing with unfinished tasks Closes #2099 from aoen/ddavydov/fix_premature_dagrun_failures
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/c29af466 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/c29af466 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/c29af466 Branch: refs/heads/v1-8-test Commit: c29af4668a67b5d7f969140549558714fb7b32c9 Parents: ff0fa00 Author: Dan Davydov <[email protected]> Authored: Fri Feb 24 14:29:11 2017 -0800 Committer: Bolke de Bruin <[email protected]> Committed: Sun Mar 12 08:17:40 2017 -0700 ---------------------------------------------------------------------- airflow/models.py | 6 +++--- tests/dags/test_issue_1225.py | 13 +++++++++++++ tests/jobs.py | 24 ++++++++++++++++++++++++ 3 files changed, 40 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c29af466/airflow/models.py ---------------------------------------------------------------------- diff --git a/airflow/models.py b/airflow/models.py index 1829ff3..3fef407 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -3993,12 +3993,12 @@ class DagRun(Base): # future: remove the check on adhoc tasks (=active_tasks) if len(tis) == len(dag.active_tasks): - # if any roots failed, the run failed root_ids = [t.task_id for t in dag.roots] roots = [t for t in tis if t.task_id in root_ids] - if any(r.state in (State.FAILED, State.UPSTREAM_FAILED) - for r in roots): + # if all roots finished and at least on failed, the run failed + if (not unfinished_tasks and + any(r.state in (State.FAILED, State.UPSTREAM_FAILED) for r in roots)): logging.info('Marking run {} failed'.format(self)) self.state = State.FAILED http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c29af466/tests/dags/test_issue_1225.py ---------------------------------------------------------------------- diff --git a/tests/dags/test_issue_1225.py b/tests/dags/test_issue_1225.py index 021561f..d01fd79 100644 --- a/tests/dags/test_issue_1225.py +++ b/tests/dags/test_issue_1225.py @@ -129,3 +129,16 @@ dag7_subdag1 = SubDagOperator( subdag=subdag7) subdag7_task1.set_downstream(subdag7_task2) subdag7_task2.set_downstream(subdag7_task3) + +# DAG tests that a Dag run that doesn't complete but has a root failure is marked running +dag8 = DAG(dag_id='test_dagrun_states_root_fail_unfinished', default_args=default_args) +dag8_task1 = DummyOperator( + task_id='test_dagrun_unfinished', # The test will unset the task instance state after + # running this test + dag=dag8, +) +dag8_task2 = PythonOperator( + task_id='test_dagrun_fail', + dag=dag8, + python_callable=fail, +) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c29af466/tests/jobs.py ---------------------------------------------------------------------- diff --git a/tests/jobs.py b/tests/jobs.py index e520b44..1f7950e 100644 --- a/tests/jobs.py +++ b/tests/jobs.py @@ -358,6 +358,30 @@ class SchedulerJobTest(unittest.TestCase): }, dagrun_state=State.FAILED) + def test_dagrun_root_fail_unfinished(self): + """ + DagRuns with one unfinished and one failed root task -> RUNNING + """ + # Run both the failed and successful tasks + scheduler = SchedulerJob(**self.default_scheduler_args) + dag_id = 'test_dagrun_states_root_fail_unfinished' + dag = self.dagbag.get_dag(dag_id) + dag.clear() + dr = scheduler.create_dag_run(dag) + try: + dag.run(start_date=dr.execution_date, end_date=dr.execution_date) + except AirflowException: # Expect an exception since there is a failed task + pass + + # Mark the successful task as never having run since we want to see if the + # dagrun will be in a running state despite haveing an unfinished task. + session = settings.Session() + ti = dr.get_task_instance('test_dagrun_unfinished', session=session) + ti.state = State.NONE + session.commit() + dr_state = dr.update_state() + self.assertEqual(dr_state, State.RUNNING) + def test_dagrun_deadlock_ignore_depends_on_past_advance_ex_date(self): """ DagRun is marked a success if ignore_first_depends_on_past=True
