Repository: incubator-airflow
Updated Branches:
  refs/heads/airbnb_rb1.7.1 173b19313 -> ce86e0319


Cherrypick bugfix ab5d445992617585a0ced1d81881a0728f49b13a


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/ce86e031
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/ce86e031
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/ce86e031

Branch: refs/heads/airbnb_rb1.7.1
Commit: ce86e0319031a95bf14cb6b0b3edd4a4962adbdc
Parents: 173b193
Author: Siddharth Anand <[email protected]>
Authored: Thu May 12 03:37:51 2016 +0000
Committer: Dan Davydov <[email protected]>
Committed: Fri May 13 10:32:53 2016 -0700

----------------------------------------------------------------------
 airflow/jobs.py   |  5 +++++
 airflow/models.py | 11 ++++++++++-
 2 files changed, 15 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ce86e031/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 34318f3..06436ef 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -513,6 +513,11 @@ class SchedulerJob(BaseJob):
             elif ti.is_runnable(flag_upstream_failed=True):
                 self.logger.debug('Firing task: {}'.format(ti))
                 executor.queue_task_instance(ti, pickle_id=pickle_id)
+            elif ti.is_premature():
+                continue
+            else:
+                self.logger.debug('Adding task: {} to the COULD_NOT_RUN 
set'.format(ti))
+                could_not_run.add(ti)
 
         # Releasing the lock
         self.logger.debug("Unlocking DAG (scheduler_lock)")

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ce86e031/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index eeb1269..7754875 100644
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -788,7 +788,7 @@ class TaskInstance(Base):
         if self.execution_date > datetime.now():
             return False
         # is the task still in the retry waiting period?
-        elif self.state == State.UP_FOR_RETRY and not self.ready_for_retry():
+        elif self.is_premature():
             return False
         # does the task have an end_date prior to the execution date?
         elif self.task.end_date and self.execution_date > self.task.end_date:
@@ -810,6 +810,15 @@ class TaskInstance(Base):
         else:
             return False
 
+
+    def is_premature(self):
+        """
+        Returns whether a task is in UP_FOR_RETRY state and its retry interval
+        has elapsed.
+        """
+        # is the task still in the retry waiting period?
+        return self.state == State.UP_FOR_RETRY and not self.ready_for_retry()
+
     def is_runnable(
             self,
             include_queued=False,

Reply via email to