Repository: incubator-airflow Updated Branches: refs/heads/airbnb_rb1.7.1_3 4a5f4a0ba -> 563be1324
Fix : Don't treat premature tasks as could_not_run tasks Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/563be132 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/563be132 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/563be132 Branch: refs/heads/airbnb_rb1.7.1_3 Commit: 563be1324d982f6033e7d087f41e070805aefa21 Parents: 4a5f4a0 Author: Siddharth Anand <[email protected]> Authored: Thu May 12 03:37:51 2016 +0000 Committer: Dan Davydov <[email protected]> Committed: Fri May 13 10:34:21 2016 -0700 ---------------------------------------------------------------------- airflow/jobs.py | 3 +++ airflow/models.py | 11 ++++++++++- 2 files changed, 13 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/563be132/airflow/jobs.py ---------------------------------------------------------------------- diff --git a/airflow/jobs.py b/airflow/jobs.py index 8a38086..cbd536f 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -525,7 +525,10 @@ class SchedulerJob(BaseJob): elif ti.is_runnable(flag_upstream_failed=True): self.logger.debug('Queuing task: {}'.format(ti)) queue.put((ti.key, pickle_id)) + elif ti.is_premature(): + continue else: + self.logger.debug('Adding task: {} to the COULD_NOT_RUN set'.format(ti)) could_not_run.add(ti) # this type of deadlock happens when dagruns can't even start and so http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/563be132/airflow/models.py ---------------------------------------------------------------------- diff --git a/airflow/models.py b/airflow/models.py index 24ee0f5..3bad273 100644 --- a/airflow/models.py +++ b/airflow/models.py @@ -856,7 +856,7 @@ class TaskInstance(Base): if self.execution_date > datetime.now(): return False # is the task still in the retry waiting period? - elif self.state == State.UP_FOR_RETRY and not self.ready_for_retry(): + elif self.is_premature(): return False # does the task have an end_date prior to the execution date? elif self.task.end_date and self.execution_date > self.task.end_date: @@ -878,6 +878,15 @@ class TaskInstance(Base): else: return False + + def is_premature(self): + """ + Returns whether a task is in UP_FOR_RETRY state and its retry interval + has elapsed. + """ + # is the task still in the retry waiting period? + return self.state == State.UP_FOR_RETRY and not self.ready_for_retry() + def is_runnable( self, include_queued=False,
