This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch v3-0-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v3-0-test by this push:
new ed08b530f8b [v3-0-test] Handle overflow in TaskInstance
`next_retry_datetime` (#48557) (#54460)
ed08b530f8b is described below
commit ed08b530f8b2fb3c5bee6cde0ec10b6bcf4d5eee
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Aug 13 14:10:52 2025 +0100
[v3-0-test] Handle overflow in TaskInstance `next_retry_datetime` (#48557)
(#54460)
This handles overflow when calculating the next execution time for a task
instance by falling back to the configured maximum delay. The solution uses the
same strategy that tenacity uses:
https://github.com/jd/tenacity/blob/main/tenacity/wait.py#L167
An alternate solution would be the determine the maximum tries that
wouldn't exceed the maximum delay and then not calculate the timeout for values
larger than that.
Something like
```
max_delay = self.task.max_retry_delay if self.task.max_retry_delay is not
null else MAX_RETRY_DELAY
tries_before_max_delay = math.floor(math.log2(max_delay))
if self.try_number <= tries_before_max_delay:
# existing logic
else:
delay = max_delay
```
(cherry picked from commit 33658f0ff86900a4b679a4506a78cc2c2af226f9)
closes: https://github.com/apache/airflow/issues/47971
Co-authored-by: perry2of5 <[email protected]>
---
airflow-core/src/airflow/models/taskinstance.py | 18 ++++++++++------
.../tests/unit/models/test_taskinstance.py | 25 ++++++++++++++++++++++
2 files changed, 37 insertions(+), 6 deletions(-)
diff --git a/airflow-core/src/airflow/models/taskinstance.py
b/airflow-core/src/airflow/models/taskinstance.py
index 27b0305f8d4..ca4a40c47ba 100644
--- a/airflow-core/src/airflow/models/taskinstance.py
+++ b/airflow-core/src/airflow/models/taskinstance.py
@@ -1265,12 +1265,18 @@ class TaskInstance(Base, LoggingMixin):
delay = self.task.retry_delay
if self.task.retry_exponential_backoff:
- # If the min_backoff calculation is below 1, it will be converted
to 0 via int. Thus,
- # we must round up prior to converting to an int, otherwise a
divide by zero error
- # will occur in the modded_hash calculation.
- # this probably gives unexpected results if a task instance has
previously been cleared,
- # because try_number can increase without bound
- min_backoff = math.ceil(delay.total_seconds() * (2 **
(self.try_number - 1)))
+ try:
+ # If the min_backoff calculation is below 1, it will be
converted to 0 via int. Thus,
+ # we must round up prior to converting to an int, otherwise a
divide by zero error
+ # will occur in the modded_hash calculation.
+ # this probably gives unexpected results if a task instance
has previously been cleared,
+ # because try_number can increase without bound
+ min_backoff = math.ceil(delay.total_seconds() * (2 **
(self.try_number - 1)))
+ except OverflowError:
+ min_backoff = MAX_RETRY_DELAY
+ self.log.warning(
+ "OverflowError occurred while calculating min_backoff,
using MAX_RETRY_DELAY for min_backoff."
+ )
# In the case when delay.total_seconds() is 0, min_backoff will
not be rounded up to 1.
# To address this, we impose a lower bound of 1 on min_backoff.
This effectively makes
diff --git a/airflow-core/tests/unit/models/test_taskinstance.py
b/airflow-core/tests/unit/models/test_taskinstance.py
index 5d46281dbbe..6c5ff50a319 100644
--- a/airflow-core/tests/unit/models/test_taskinstance.py
+++ b/airflow-core/tests/unit/models/test_taskinstance.py
@@ -625,6 +625,31 @@ class TestTaskInstance:
date = ti.next_retry_datetime()
assert date == ti.end_date + max_delay
+ def test_next_retry_datetime_returns_max_for_overflow(self, dag_maker):
+ delay = datetime.timedelta(seconds=30)
+ max_delay = datetime.timedelta(minutes=60)
+
+ with dag_maker(dag_id="fail_dag"):
+ task = BashOperator(
+ task_id="task_with_exp_backoff_and_max_delay",
+ bash_command="exit 1",
+ retries=3,
+ retry_delay=delay,
+ retry_exponential_backoff=True,
+ max_retry_delay=max_delay,
+ )
+ ti = dag_maker.create_dagrun().task_instances[0]
+ ti.task = task
+ ti.end_date = pendulum.instance(timezone.utcnow())
+
+ ti.try_number = 5000
+ date = ti.next_retry_datetime()
+ assert date == ti.end_date + max_delay
+
+ ti.try_number = 50000
+ date = ti.next_retry_datetime()
+ assert date == ti.end_date + max_delay
+
@pytest.mark.parametrize("seconds", [0, 0.5, 1])
def test_next_retry_datetime_short_or_zero_intervals(self, dag_maker,
seconds):
delay = datetime.timedelta(seconds=seconds)