[AIRFLOW-1036] Randomize exponential backoff This prevents the thundering herd problem. Using a combination of dag_run, task_id, and execution_date makes this random with respect to task instances, while still being deterministic across machines. The retry delay is within a range that doubles in size.
Closes #2262 from saguziel/aguziel-random- exponential-backoff Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/258baf0f Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/258baf0f Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/258baf0f Branch: refs/heads/v1-8-test Commit: 258baf0ff0af9529388a464174dc703d0ad48f5b Parents: 6503995 Author: Alex Guziel <[email protected]> Authored: Sat Apr 29 17:11:58 2017 +0200 Committer: Maxime Beauchemin <[email protected]> Committed: Thu Jun 8 08:36:20 2017 -0700 ---------------------------------------------------------------------- airflow/models.py | 8 +++++++- tests/models.py | 15 +++++++++++---- 2 files changed, 18 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/258baf0f/airflow/models.py ---------------------------------------------------------------------- diff --git a/airflow/models.py b/airflow/models.py index bf50c4c..8ef9748 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -1163,13 +1163,19 @@ class TaskInstance(Base): """ delay = self.task.retry_delay if self.task.retry_exponential_backoff: + min_backoff = int(delay.total_seconds() * (2 ** (self.try_number - 2))) + # deterministic per task instance + hash = int(hashlib.sha1("{}#{}#{}#{}".format(self.dag_id, self.task_id, + self.execution_date, self.try_number).encode('utf-8')).hexdigest(), 16) + # between 0.5 * delay * (2^retry_number) and 1.0 * delay * (2^retry_number) + modded_hash = min_backoff + hash % min_backoff # timedelta has a maximum representable value. The exponentiation # here means this value can be exceeded after a certain number # of tries (around 50 if the initial delay is 1s, even fewer if # the delay is larger). Cap the value here before creating a # timedelta object so the operation doesn't fail. delay_backoff_in_seconds = min( - delay.total_seconds() * (2 ** (self.try_number - 1)), + modded_hash, timedelta.max.total_seconds() - 1 ) delay = timedelta(seconds=delay_backoff_in_seconds) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/258baf0f/tests/models.py ---------------------------------------------------------------------- diff --git a/tests/models.py b/tests/models.py index b3b8c2a..759be47 100644 --- a/tests/models.py +++ b/tests/models.py @@ -877,18 +877,25 @@ class TaskInstanceTest(unittest.TestCase): owner='airflow', start_date=datetime.datetime(2016, 2, 1, 0, 0, 0)) ti = TI( - task=task, execution_date=datetime.datetime.now()) + task=task, execution_date=DEFAULT_DATE) ti.end_date = datetime.datetime.now() ti.try_number = 1 dt = ti.next_retry_datetime() - self.assertEqual(dt, ti.end_date + delay) + # between 30 * 2^0.5 and 30 * 2^1 (15 and 30) + self.assertEqual(dt, ti.end_date + datetime.timedelta(seconds=20.0)) + + ti.try_number = 4 + dt = ti.next_retry_datetime() + # between 30 * 2^2 and 30 * 2^3 (120 and 240) + self.assertEqual(dt, ti.end_date + datetime.timedelta(seconds=181.0)) ti.try_number = 6 dt = ti.next_retry_datetime() - self.assertEqual(dt, ti.end_date + (2 ** 5) * delay) + # between 30 * 2^4 and 30 * 2^5 (480 and 960) + self.assertEqual(dt, ti.end_date + datetime.timedelta(seconds=825.0)) - ti.try_number = 8 + ti.try_number = 9 dt = ti.next_retry_datetime() self.assertEqual(dt, ti.end_date+max_delay)
