[AIRFLOW-747] Fix retry_delay not honoured Dag runs were marked deadlocked although a task was still up for retry and in its retry_delay period. Next to that _execute_task_instances was picking up tasks in UP_FOR_RETRY state directly from the database, while tasks that pass the dependency check will be set to scheduled.
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/68f484cd Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/68f484cd Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/68f484cd Branch: refs/heads/v1-8-test Commit: 68f484cd6073351ccd01d60279a982cfa272cfa5 Parents: e0f5c0c Author: Bolke de Bruin <[email protected]> Authored: Fri Jan 13 12:25:26 2017 +0100 Committer: Bolke de Bruin <[email protected]> Committed: Fri Jan 13 20:52:07 2017 +0100 ---------------------------------------------------------------------- airflow/jobs.py | 11 +++++++++-- airflow/models.py | 10 ++++++++-- airflow/ti_deps/dep_context.py | 4 ++++ airflow/ti_deps/deps/not_in_retry_period_dep.py | 6 ++++++ tests/jobs.py | 10 ++++++++-- 5 files changed, 35 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/68f484cd/airflow/jobs.py ---------------------------------------------------------------------- diff --git a/airflow/jobs.py b/airflow/jobs.py index 819d107..6906625 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -875,6 +875,7 @@ class SchedulerJob(BaseJob): .query(models.TaskInstance) .filter(models.TaskInstance.dag_id.in_(simple_dag_bag.dag_ids)) .filter(models.TaskInstance.state.in_(old_states)) + .with_for_update() .all() ) """:type: list[TaskInstance]""" @@ -1050,6 +1051,13 @@ class SchedulerJob(BaseJob): .format(task_instance.key, priority, queue)) # Set the state to queued + task_instance.refresh_from_db(lock_for_update=True, session=session) + if task_instance.state not in states: + self.logger.info("Task {} was set to {} outside this scheduler." + .format(task_instance.key, task_instance.state)) + session.commit() + continue + self.logger.info("Setting state of {} to {}".format( task_instance.key, State.QUEUED)) task_instance.state = State.QUEUED @@ -1393,8 +1401,7 @@ class SchedulerJob(BaseJob): State.NONE) self._execute_task_instances(simple_dag_bag, - (State.SCHEDULED, - State.UP_FOR_RETRY)) + (State.SCHEDULED,)) # Call hearbeats self.logger.info("Heartbeating the executor") http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/68f484cd/airflow/models.py ---------------------------------------------------------------------- diff --git a/airflow/models.py b/airflow/models.py index 0bd744e..85abdfe 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -3693,6 +3693,7 @@ class DagRun(Base): ID_PREFIX = 'scheduled__' ID_FORMAT_PREFIX = ID_PREFIX + '{0}' + DEADLOCK_CHECK_DEP_CONTEXT = DepContext(ignore_in_retry_period=True) id = Column(Integer, primary_key=True) dag_id = Column(String(ID_LEN)) @@ -3879,8 +3880,13 @@ class DagRun(Base): # small speed up if unfinished_tasks and none_depends_on_past: # todo: this can actually get pretty slow: one task costs between 0.01-015s - no_dependencies_met = all(not t.are_dependencies_met(session=session) - for t in unfinished_tasks) + no_dependencies_met = all( + # Use a special dependency context that ignores task's up for retry + # dependency, since a task that is up for retry is not necessarily + # deadlocked. + not t.are_dependencies_met(dep_context=self.DEADLOCK_CHECK_DEP_CONTEXT, + session=session) + for t in unfinished_tasks) duration = (datetime.now() - start_dttm).total_seconds() * 1000 Stats.timing("dagrun.dependency-check.{}.{}". http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/68f484cd/airflow/ti_deps/dep_context.py ---------------------------------------------------------------------- diff --git a/airflow/ti_deps/dep_context.py b/airflow/ti_deps/dep_context.py index 583099d..01e01dd 100644 --- a/airflow/ti_deps/dep_context.py +++ b/airflow/ti_deps/dep_context.py @@ -49,6 +49,8 @@ class DepContext(object): :param ignore_depends_on_past: Ignore depends_on_past parameter of DAGs (e.g. for Backfills) :type ignore_depends_on_past: boolean + :param ignore_in_retry_period: Ignore the retry period for task instances + :type ignore_in_retry_period: boolean :param ignore_task_deps: Ignore task-specific dependencies such as depends_on_past and trigger rule :type ignore_task_deps: boolean @@ -61,12 +63,14 @@ class DepContext(object): flag_upstream_failed=False, ignore_all_deps=False, ignore_depends_on_past=False, + ignore_in_retry_period=False, ignore_task_deps=False, ignore_ti_state=False): self.deps = deps or set() self.flag_upstream_failed = flag_upstream_failed self.ignore_all_deps = ignore_all_deps self.ignore_depends_on_past = ignore_depends_on_past + self.ignore_in_retry_period = ignore_in_retry_period self.ignore_task_deps = ignore_task_deps self.ignore_ti_state = ignore_ti_state http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/68f484cd/airflow/ti_deps/deps/not_in_retry_period_dep.py ---------------------------------------------------------------------- diff --git a/airflow/ti_deps/deps/not_in_retry_period_dep.py b/airflow/ti_deps/deps/not_in_retry_period_dep.py index 8305094..05dceac 100644 --- a/airflow/ti_deps/deps/not_in_retry_period_dep.py +++ b/airflow/ti_deps/deps/not_in_retry_period_dep.py @@ -25,6 +25,12 @@ class NotInRetryPeriodDep(BaseTIDep): @provide_session def _get_dep_statuses(self, ti, session, dep_context): + if dep_context.ignore_in_retry_period: + yield self._passing_status( + reason="The context specified that being in a retry period was " + "permitted.") + raise StopIteration + if ti.state != State.UP_FOR_RETRY: yield self._passing_status( reason="The task instance was not marked for retrying.") http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/68f484cd/tests/jobs.py ---------------------------------------------------------------------- diff --git a/tests/jobs.py b/tests/jobs.py index 32c615d..3d70415 100644 --- a/tests/jobs.py +++ b/tests/jobs.py @@ -913,7 +913,8 @@ class SchedulerJobTest(unittest.TestCase): dag = DAG( dag_id='test_retry_still_in_executor', - start_date=DEFAULT_DATE) + start_date=DEFAULT_DATE, + schedule_interval="@once") dag_task1 = BashOperator( task_id='test_retry_handling_op', bash_command='exit 1', @@ -963,11 +964,16 @@ class SchedulerJobTest(unittest.TestCase): self.assertEqual(ti.state, State.UP_FOR_RETRY) self.assertEqual(ti.try_number, 1) + ti.refresh_from_db(lock_for_update=True, session=session) + ti.state = State.SCHEDULED + session.merge(ti) + session.commit() + # do not schedule do_schedule() self.assertTrue(executor.has_task(ti)) ti.refresh_from_db() - self.assertEqual(ti.state, State.UP_FOR_RETRY) + self.assertEqual(ti.state, State.SCHEDULED) # now the executor has cleared and it should be allowed the re-queue executor.queued_tasks.clear()
