This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v3-0-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v3-0-test by this push:
     new ed08b530f8b [v3-0-test] Handle overflow in TaskInstance 
`next_retry_datetime` (#48557) (#54460)
ed08b530f8b is described below

commit ed08b530f8b2fb3c5bee6cde0ec10b6bcf4d5eee
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Aug 13 14:10:52 2025 +0100

    [v3-0-test] Handle overflow in TaskInstance `next_retry_datetime` (#48557) 
(#54460)
    
    This handles overflow when calculating the next execution time for a task 
instance by falling back to the configured maximum delay. The solution uses the 
same strategy that tenacity uses:
    https://github.com/jd/tenacity/blob/main/tenacity/wait.py#L167
    
    An alternate solution would be the determine the maximum tries that 
wouldn't exceed the maximum delay and then not calculate the timeout for values 
larger than that.
    
    Something like
    
    ```
    max_delay = self.task.max_retry_delay if self.task.max_retry_delay is not 
null else MAX_RETRY_DELAY
    tries_before_max_delay = math.floor(math.log2(max_delay))
    if self.try_number <= tries_before_max_delay:
         # existing logic
    else:
        delay = max_delay
    ```
    (cherry picked from commit 33658f0ff86900a4b679a4506a78cc2c2af226f9)
    
    
    closes: https://github.com/apache/airflow/issues/47971
    
    Co-authored-by: perry2of5 <[email protected]>
---
 airflow-core/src/airflow/models/taskinstance.py    | 18 ++++++++++------
 .../tests/unit/models/test_taskinstance.py         | 25 ++++++++++++++++++++++
 2 files changed, 37 insertions(+), 6 deletions(-)

diff --git a/airflow-core/src/airflow/models/taskinstance.py 
b/airflow-core/src/airflow/models/taskinstance.py
index 27b0305f8d4..ca4a40c47ba 100644
--- a/airflow-core/src/airflow/models/taskinstance.py
+++ b/airflow-core/src/airflow/models/taskinstance.py
@@ -1265,12 +1265,18 @@ class TaskInstance(Base, LoggingMixin):
 
         delay = self.task.retry_delay
         if self.task.retry_exponential_backoff:
-            # If the min_backoff calculation is below 1, it will be converted 
to 0 via int. Thus,
-            # we must round up prior to converting to an int, otherwise a 
divide by zero error
-            # will occur in the modded_hash calculation.
-            # this probably gives unexpected results if a task instance has 
previously been cleared,
-            # because try_number can increase without bound
-            min_backoff = math.ceil(delay.total_seconds() * (2 ** 
(self.try_number - 1)))
+            try:
+                # If the min_backoff calculation is below 1, it will be 
converted to 0 via int. Thus,
+                # we must round up prior to converting to an int, otherwise a 
divide by zero error
+                # will occur in the modded_hash calculation.
+                # this probably gives unexpected results if a task instance 
has previously been cleared,
+                # because try_number can increase without bound
+                min_backoff = math.ceil(delay.total_seconds() * (2 ** 
(self.try_number - 1)))
+            except OverflowError:
+                min_backoff = MAX_RETRY_DELAY
+                self.log.warning(
+                    "OverflowError occurred while calculating min_backoff, 
using MAX_RETRY_DELAY for min_backoff."
+                )
 
             # In the case when delay.total_seconds() is 0, min_backoff will 
not be rounded up to 1.
             # To address this, we impose a lower bound of 1 on min_backoff. 
This effectively makes
diff --git a/airflow-core/tests/unit/models/test_taskinstance.py 
b/airflow-core/tests/unit/models/test_taskinstance.py
index 5d46281dbbe..6c5ff50a319 100644
--- a/airflow-core/tests/unit/models/test_taskinstance.py
+++ b/airflow-core/tests/unit/models/test_taskinstance.py
@@ -625,6 +625,31 @@ class TestTaskInstance:
         date = ti.next_retry_datetime()
         assert date == ti.end_date + max_delay
 
+    def test_next_retry_datetime_returns_max_for_overflow(self, dag_maker):
+        delay = datetime.timedelta(seconds=30)
+        max_delay = datetime.timedelta(minutes=60)
+
+        with dag_maker(dag_id="fail_dag"):
+            task = BashOperator(
+                task_id="task_with_exp_backoff_and_max_delay",
+                bash_command="exit 1",
+                retries=3,
+                retry_delay=delay,
+                retry_exponential_backoff=True,
+                max_retry_delay=max_delay,
+            )
+        ti = dag_maker.create_dagrun().task_instances[0]
+        ti.task = task
+        ti.end_date = pendulum.instance(timezone.utcnow())
+
+        ti.try_number = 5000
+        date = ti.next_retry_datetime()
+        assert date == ti.end_date + max_delay
+
+        ti.try_number = 50000
+        date = ti.next_retry_datetime()
+        assert date == ti.end_date + max_delay
+
     @pytest.mark.parametrize("seconds", [0, 0.5, 1])
     def test_next_retry_datetime_short_or_zero_intervals(self, dag_maker, 
seconds):
         delay = datetime.timedelta(seconds=seconds)

Reply via email to