Repository: incubator-airflow Updated Branches: refs/heads/master 2fa6905f4 -> 66168efa1
[AIRFLOW-1036] Randomize exponential backoff This prevents the thundering herd problem. Using a combination of dag_run, task_id, and execution_date makes this random with respect to task instances, while still being deterministic across machines. The retry delay is within a range that doubles in size. Closes #2262 from saguziel/aguziel-random- exponential-backoff Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/66168efa Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/66168efa Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/66168efa Branch: refs/heads/master Commit: 66168efa12de98a9c29b20e5cea28c7e34a2d90a Parents: 2fa6905 Author: Alex Guziel <[email protected]> Authored: Sat Apr 29 17:11:58 2017 +0200 Committer: Bolke de Bruin <[email protected]> Committed: Sat Apr 29 17:11:58 2017 +0200 ---------------------------------------------------------------------- airflow/models.py | 8 +++++++- tests/models.py | 15 +++++++++++---- 2 files changed, 18 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/66168efa/airflow/models.py ---------------------------------------------------------------------- diff --git a/airflow/models.py b/airflow/models.py index 51beab8..d2f7894 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -1183,13 +1183,19 @@ class TaskInstance(Base): """ delay = self.task.retry_delay if self.task.retry_exponential_backoff: + min_backoff = int(delay.total_seconds() * (2 ** (self.try_number - 2))) + # deterministic per task instance + hash = int(hashlib.sha1("{}#{}#{}#{}".format(self.dag_id, self.task_id, + self.execution_date, self.try_number).encode('utf-8')).hexdigest(), 16) + # between 0.5 * delay * (2^retry_number) and 1.0 * delay * (2^retry_number) + modded_hash = min_backoff + hash % min_backoff # timedelta has a maximum representable value. The exponentiation # here means this value can be exceeded after a certain number # of tries (around 50 if the initial delay is 1s, even fewer if # the delay is larger). Cap the value here before creating a # timedelta object so the operation doesn't fail. delay_backoff_in_seconds = min( - delay.total_seconds() * (2 ** (self.try_number - 1)), + modded_hash, timedelta.max.total_seconds() - 1 ) delay = timedelta(seconds=delay_backoff_in_seconds) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/66168efa/tests/models.py ---------------------------------------------------------------------- diff --git a/tests/models.py b/tests/models.py index 2180896..49e5c75 100644 --- a/tests/models.py +++ b/tests/models.py @@ -838,18 +838,25 @@ class TaskInstanceTest(unittest.TestCase): owner='airflow', start_date=datetime.datetime(2016, 2, 1, 0, 0, 0)) ti = TI( - task=task, execution_date=datetime.datetime.now()) + task=task, execution_date=DEFAULT_DATE) ti.end_date = datetime.datetime.now() ti.try_number = 1 dt = ti.next_retry_datetime() - self.assertEqual(dt, ti.end_date + delay) + # between 30 * 2^0.5 and 30 * 2^1 (15 and 30) + self.assertEqual(dt, ti.end_date + datetime.timedelta(seconds=20.0)) + + ti.try_number = 4 + dt = ti.next_retry_datetime() + # between 30 * 2^2 and 30 * 2^3 (120 and 240) + self.assertEqual(dt, ti.end_date + datetime.timedelta(seconds=181.0)) ti.try_number = 6 dt = ti.next_retry_datetime() - self.assertEqual(dt, ti.end_date + (2 ** 5) * delay) + # between 30 * 2^4 and 30 * 2^5 (480 and 960) + self.assertEqual(dt, ti.end_date + datetime.timedelta(seconds=825.0)) - ti.try_number = 8 + ti.try_number = 9 dt = ti.next_retry_datetime() self.assertEqual(dt, ti.end_date+max_delay)
