[AIRFLOW-1036] Randomize exponential backoff

This prevents the thundering herd problem. Using a
combination of
dag_run, task_id, and execution_date makes this
random with respect to
task instances, while still being deterministic
across machines. The
retry delay is within a range that doubles in
size.

Closes #2262 from saguziel/aguziel-random-
exponential-backoff


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/258baf0f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/258baf0f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/258baf0f

Branch: refs/heads/v1-8-test
Commit: 258baf0ff0af9529388a464174dc703d0ad48f5b
Parents: 6503995
Author: Alex Guziel <[email protected]>
Authored: Sat Apr 29 17:11:58 2017 +0200
Committer: Maxime Beauchemin <[email protected]>
Committed: Thu Jun 8 08:36:20 2017 -0700

----------------------------------------------------------------------
 airflow/models.py |  8 +++++++-
 tests/models.py   | 15 +++++++++++----
 2 files changed, 18 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/258baf0f/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index bf50c4c..8ef9748 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -1163,13 +1163,19 @@ class TaskInstance(Base):
         """
         delay = self.task.retry_delay
         if self.task.retry_exponential_backoff:
+            min_backoff = int(delay.total_seconds() * (2 ** (self.try_number - 
2)))
+            # deterministic per task instance
+            hash = int(hashlib.sha1("{}#{}#{}#{}".format(self.dag_id, 
self.task_id,
+                self.execution_date, 
self.try_number).encode('utf-8')).hexdigest(), 16)
+            # between 0.5 * delay * (2^retry_number) and 1.0 * delay * 
(2^retry_number)
+            modded_hash = min_backoff + hash % min_backoff
             # timedelta has a maximum representable value. The exponentiation
             # here means this value can be exceeded after a certain number
             # of tries (around 50 if the initial delay is 1s, even fewer if
             # the delay is larger). Cap the value here before creating a
             # timedelta object so the operation doesn't fail.
             delay_backoff_in_seconds = min(
-                delay.total_seconds() * (2 ** (self.try_number - 1)),
+                modded_hash,
                 timedelta.max.total_seconds() - 1
             )
             delay = timedelta(seconds=delay_backoff_in_seconds)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/258baf0f/tests/models.py
----------------------------------------------------------------------
diff --git a/tests/models.py b/tests/models.py
index b3b8c2a..759be47 100644
--- a/tests/models.py
+++ b/tests/models.py
@@ -877,18 +877,25 @@ class TaskInstanceTest(unittest.TestCase):
             owner='airflow',
             start_date=datetime.datetime(2016, 2, 1, 0, 0, 0))
         ti = TI(
-            task=task, execution_date=datetime.datetime.now())
+            task=task, execution_date=DEFAULT_DATE)
         ti.end_date = datetime.datetime.now()
 
         ti.try_number = 1
         dt = ti.next_retry_datetime()
-        self.assertEqual(dt, ti.end_date + delay)
+        # between 30 * 2^0.5 and 30 * 2^1 (15 and 30)
+        self.assertEqual(dt, ti.end_date + datetime.timedelta(seconds=20.0))
+
+        ti.try_number = 4
+        dt = ti.next_retry_datetime()
+        # between 30 * 2^2 and 30 * 2^3 (120 and 240)
+        self.assertEqual(dt, ti.end_date + datetime.timedelta(seconds=181.0))
 
         ti.try_number = 6
         dt = ti.next_retry_datetime()
-        self.assertEqual(dt, ti.end_date + (2 ** 5) * delay)
+        # between 30 * 2^4 and 30 * 2^5 (480 and 960)
+        self.assertEqual(dt, ti.end_date + datetime.timedelta(seconds=825.0))
 
-        ti.try_number = 8
+        ti.try_number = 9
         dt = ti.next_retry_datetime()
         self.assertEqual(dt, ti.end_date+max_delay)
 

Reply via email to