Repository: incubator-airflow Updated Branches: refs/heads/master 67b47c958 -> ea86895d5
[AIRFLOW-1420][AIRFLOW-1473] Fix deadlock check Update the deadlock check to prevent false positives on upstream failure or skip conditions. Closes #2506 from gwax/fix_dead_dagruns Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/ea86895d Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/ea86895d Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/ea86895d Branch: refs/heads/master Commit: ea86895d5b81d6fed4f26c201f8874bacdd291e5 Parents: 67b47c9 Author: George Leslie-Waksman <[email protected]> Authored: Thu Aug 17 15:19:46 2017 -0700 Committer: Alex Guziel <[email protected]> Committed: Thu Aug 17 15:19:52 2017 -0700 ---------------------------------------------------------------------- airflow/models.py | 21 +++++++++------ airflow/ti_deps/deps/trigger_rule_dep.py | 4 +-- tests/models.py | 37 ++++++++++++++++++++++++--- 3 files changed, 48 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ea86895d/airflow/models.py ---------------------------------------------------------------------- diff --git a/airflow/models.py b/airflow/models.py index 0b82c56..bf308e5 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -4231,7 +4231,6 @@ class DagRun(Base): ID_PREFIX = 'scheduled__' ID_FORMAT_PREFIX = ID_PREFIX + '{0}' - DEADLOCK_CHECK_DEP_CONTEXT = DepContext(ignore_in_retry_period=True) id = Column(Integer, primary_key=True) dag_id = Column(String(ID_LEN)) @@ -4457,13 +4456,19 @@ class DagRun(Base): # small speed up if unfinished_tasks and none_depends_on_past: # todo: this can actually get pretty slow: one task costs between 0.01-015s - no_dependencies_met = all( - # Use a special dependency context that ignores task's up for retry - # dependency, since a task that is up for retry is not necessarily - # deadlocked. - not t.are_dependencies_met(dep_context=self.DEADLOCK_CHECK_DEP_CONTEXT, - session=session) - for t in unfinished_tasks) + no_dependencies_met = True + for ut in unfinished_tasks: + # We need to flag upstream and check for changes because upstream + # failures can result in deadlock false positives + old_state = ut.state + deps_met = ut.are_dependencies_met( + dep_context=DepContext( + flag_upstream_failed=True, + ignore_in_retry_period=True), + session=session) + if deps_met or old_state != ut.current_state(session=session): + no_dependencies_met = False + break duration = (datetime.now() - start_dttm).total_seconds() * 1000 Stats.timing("dagrun.dependency-check.{}.{}". http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ea86895d/airflow/ti_deps/deps/trigger_rule_dep.py ---------------------------------------------------------------------- diff --git a/airflow/ti_deps/deps/trigger_rule_dep.py b/airflow/ti_deps/deps/trigger_rule_dep.py index cf06c0b..5a80314 100644 --- a/airflow/ti_deps/deps/trigger_rule_dep.py +++ b/airflow/ti_deps/deps/trigger_rule_dep.py @@ -124,8 +124,8 @@ class TriggerRuleDep(BaseTIDep): tr = task.trigger_rule upstream_done = done >= upstream upstream_tasks_state = { - "successes": successes, "skipped": skipped, "failed": failed, - "upstream_failed": upstream_failed, "done": done + "total": upstream, "successes": successes, "skipped": skipped, + "failed": failed, "upstream_failed": upstream_failed, "done": done } # TODO(aoen): Ideally each individual trigger rules would be it's own class, but # this isn't very feasible at the moment since the database queries need to be http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ea86895d/tests/models.py ---------------------------------------------------------------------- diff --git a/tests/models.py b/tests/models.py index 266e036..96275d3 100644 --- a/tests/models.py +++ b/tests/models.py @@ -37,6 +37,7 @@ from airflow.operators.python_operator import PythonOperator from airflow.operators.python_operator import ShortCircuitOperator from airflow.ti_deps.deps.trigger_rule_dep import TriggerRuleDep from airflow.utils.state import State +from airflow.utils.trigger_rule import TriggerRule from mock import patch from parameterized import parameterized @@ -483,10 +484,38 @@ class DagRunTest(unittest.TestCase): state = dr.update_state() self.assertEqual(State.SUCCESS, state) - # upstream dependency failed, root has not run - ti_op1.set_state(State.NONE, session) - state = dr.update_state() - self.assertEqual(State.FAILED, state) + def test_dagrun_deadlock(self): + session = settings.Session() + dag = DAG( + 'text_dagrun_deadlock', + start_date=DEFAULT_DATE, + default_args={'owner': 'owner1'}) + + with dag: + op1 = DummyOperator(task_id='A') + op2 = DummyOperator(task_id='B') + op2.trigger_rule = TriggerRule.ONE_FAILED + op2.set_upstream(op1) + + dag.clear() + now = datetime.datetime.now() + dr = dag.create_dagrun(run_id='test_dagrun_deadlock', + state=State.RUNNING, + execution_date=now, + start_date=now) + + ti_op1 = dr.get_task_instance(task_id=op1.task_id) + ti_op1.set_state(state=State.SUCCESS, session=session) + ti_op2 = dr.get_task_instance(task_id=op2.task_id) + ti_op2.set_state(state=State.NONE, session=session) + + dr.update_state() + self.assertEqual(dr.state, State.RUNNING) + + ti_op2.set_state(state=State.NONE, session=session) + op2.trigger_rule = 'invalid' + dr.update_state() + self.assertEqual(dr.state, State.FAILED) def test_get_task_instance_on_empty_dagrun(self): """
