Repository: incubator-airflow Updated Branches: refs/heads/airbnb_rb1.7.1 173b19313 -> ce86e0319
Cherrypick bugfix ab5d445992617585a0ced1d81881a0728f49b13a Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/ce86e031 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/ce86e031 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/ce86e031 Branch: refs/heads/airbnb_rb1.7.1 Commit: ce86e0319031a95bf14cb6b0b3edd4a4962adbdc Parents: 173b193 Author: Siddharth Anand <[email protected]> Authored: Thu May 12 03:37:51 2016 +0000 Committer: Dan Davydov <[email protected]> Committed: Fri May 13 10:32:53 2016 -0700 ---------------------------------------------------------------------- airflow/jobs.py | 5 +++++ airflow/models.py | 11 ++++++++++- 2 files changed, 15 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ce86e031/airflow/jobs.py ---------------------------------------------------------------------- diff --git a/airflow/jobs.py b/airflow/jobs.py index 34318f3..06436ef 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -513,6 +513,11 @@ class SchedulerJob(BaseJob): elif ti.is_runnable(flag_upstream_failed=True): self.logger.debug('Firing task: {}'.format(ti)) executor.queue_task_instance(ti, pickle_id=pickle_id) + elif ti.is_premature(): + continue + else: + self.logger.debug('Adding task: {} to the COULD_NOT_RUN set'.format(ti)) + could_not_run.add(ti) # Releasing the lock self.logger.debug("Unlocking DAG (scheduler_lock)") http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ce86e031/airflow/models.py ---------------------------------------------------------------------- diff --git a/airflow/models.py b/airflow/models.py index eeb1269..7754875 100644 --- a/airflow/models.py +++ b/airflow/models.py @@ -788,7 +788,7 @@ class TaskInstance(Base): if self.execution_date > datetime.now(): return False # is the task still in the retry waiting period? - elif self.state == State.UP_FOR_RETRY and not self.ready_for_retry(): + elif self.is_premature(): return False # does the task have an end_date prior to the execution date? elif self.task.end_date and self.execution_date > self.task.end_date: @@ -810,6 +810,15 @@ class TaskInstance(Base): else: return False + + def is_premature(self): + """ + Returns whether a task is in UP_FOR_RETRY state and its retry interval + has elapsed. + """ + # is the task still in the retry waiting period? + return self.state == State.UP_FOR_RETRY and not self.ready_for_retry() + def is_runnable( self, include_queued=False,
