[AIRFLOW-759] Use previous dag_run to verify depend_on_past The start_date and the schedule interval can be misaligned. This is automatically corrected in the scheduler. The dependency checker however did not do this.
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/89f0ca4a Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/89f0ca4a Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/89f0ca4a Branch: refs/heads/v1-8-test Commit: 89f0ca4abfa38b66d2e26788e353bfdd17772c52 Parents: 648bd4e Author: Bolke de Bruin <[email protected]> Authored: Sat Jan 14 14:31:09 2017 +0100 Committer: Bolke de Bruin <[email protected]> Committed: Sat Jan 14 21:10:56 2017 +0100 ---------------------------------------------------------------------- airflow/models.py | 46 ++++++++++++++++------------ airflow/ti_deps/deps/prev_dagrun_dep.py | 4 +-- 2 files changed, 28 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/89f0ca4a/airflow/models.py ---------------------------------------------------------------------- diff --git a/airflow/models.py b/airflow/models.py index acb6667..d878457 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -1027,19 +1027,22 @@ class TaskInstance(Base): dag = self.task.dag if dag: dr = self.get_dagrun(session=session) + + # LEGACY: most likely running from unit tests if not dr: # Means that this TI is NOT being run from a DR, but from a catchup previous_scheduled_date = dag.previous_schedule(self.execution_date) if not previous_scheduled_date: return None - else: - return TaskInstance(task=self.task, execution_date=previous_scheduled_date) - if dag.catchup: - last_dagrun = dr.get_previous_scheduled_dagrun(session=session) if dr else None + return TaskInstance(task=self.task, + execution_date=previous_scheduled_date) + dr.dag = dag + if dag.catchup: + last_dagrun = dr.get_previous_scheduled_dagrun(session=session) else: - last_dagrun = dr.get_previous_dagrun(session=session) if dr else None + last_dagrun = dr.get_previous_dagrun(session=session) if last_dagrun: return last_dagrun.get_task_instance(self.task_id, session=session) @@ -1066,16 +1069,21 @@ class TaskInstance(Base): :type verbose: boolean """ dep_context = dep_context or DepContext() + failed = False for dep_status in self.get_failed_dep_statuses( dep_context=dep_context, session=session): + failed = True if verbose: - logging.warning( - "Dependencies not met for %s, dependency '%s' FAILED: %s", - self, dep_status.dep_name, dep_status.reason) + logging.info("Dependencies not met for {}, dependency '{}' FAILED: {}" + .format(self, dep_status.dep_name, dep_status.reason)) + + if failed: return False + if verbose: - logging.info("Dependencies all met for %s", self) + logging.info("Dependencies all met for {}".format(self)) + return True @provide_session @@ -1089,12 +1097,14 @@ class TaskInstance(Base): self, session, dep_context): - if dep_status.passed: - logging.debug("%s dependency '%s' PASSED: %s", - self, - dep_status.dep_name, - dep_status.reason) - else: + + logging.debug("{} dependency '{}' PASSED: {}, {}" + .format(self, + dep_status.dep_name, + dep_status.passed, + dep_status.reason)) + + if not dep_status.passed: yield dep_status def __repr__(self): @@ -3882,13 +3892,11 @@ class DagRun(Base): @provide_session def get_previous_scheduled_dagrun(self, session=None): """The previous, SCHEDULED DagRun, if there is one""" - - if not self.dag: - return None + dag = self.get_dag() return session.query(DagRun).filter( DagRun.dag_id == self.dag_id, - DagRun.execution_date == self.dag.previous_schedule(self.execution_date) + DagRun.execution_date == dag.previous_schedule(self.execution_date) ).first() @provide_session http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/89f0ca4a/airflow/ti_deps/deps/prev_dagrun_dep.py ---------------------------------------------------------------------- diff --git a/airflow/ti_deps/deps/prev_dagrun_dep.py b/airflow/ti_deps/deps/prev_dagrun_dep.py index 2fce704..7d4baa8 100644 --- a/airflow/ti_deps/deps/prev_dagrun_dep.py +++ b/airflow/ti_deps/deps/prev_dagrun_dep.py @@ -41,13 +41,11 @@ class PrevDagrunDep(BaseTIDep): # Don't depend on the previous task instance if we are the first task dag = ti.task.dag if dag.catchup: - if ti.execution_date == ti.task.start_date: + if dag.previous_schedule(ti.execution_date) < ti.task.start_date: yield self._passing_status( reason="This task instance was the first task instance for its task.") raise StopIteration - else: - dr = ti.get_dagrun() last_dagrun = dr.get_previous_dagrun() if dr else None
