Joffreybvn opened a new issue, #56184:
URL: https://github.com/apache/airflow/issues/56184

   ### Apache Airflow version
   
   3.1.0
   
   ### If "Other Airflow 2 version" selected, which one?
   
   _No response_
   
   ### What happened?
   
   1. Declare a MappedOperator (partial expand) with 
`retry_exponential_backoff=True`
   2. Make the task failing
   3. The scheduler will fail to re-schedule the task, and crash:
   
   ```
   [2m2025-09-26T17:01:49.607085Z[0m [[31m[1merror    [0m] [1mException when 
executing SchedulerJob._run_scheduler_loop[0m 
[[0m[1m[34mairflow.jobs.scheduler_job_runner.SchedulerJobRunner[0m][0m 
[36mloc[0m=[35mscheduler_job_runner.py:1046[0m
   Traceback (most recent call last):
     File 
"/usr/local/lib/python3.13/site-packages/airflow/jobs/scheduler_job_runner.py", 
line 1042, in _execute
       self._run_scheduler_loop()
       ~~~~~~~~~~~~~~~~~~~~~~~~^^
     File 
"/usr/local/lib/python3.13/site-packages/airflow/jobs/scheduler_job_runner.py", 
line 1332, in _run_scheduler_loop
       num_queued_tis = self._do_scheduling(session)
     File 
"/usr/local/lib/python3.13/site-packages/airflow/jobs/scheduler_job_runner.py", 
line 1442, in _do_scheduling
       callback_tuples = self._schedule_all_dag_runs(guard, dag_runs, session)
     File "/usr/local/lib/python3.13/site-packages/airflow/utils/retries.py", 
line 97, in wrapped_function
       for attempt in run_with_db_retries(max_retries=retries, logger=logger, 
**retry_kwargs):
                      
~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/usr/local/lib/python3.13/site-packages/tenacity/__init__.py", line 
445, in __iter__
       do = self.iter(retry_state=retry_state)
     File "/usr/local/lib/python3.13/site-packages/tenacity/__init__.py", line 
378, in iter
       result = action(retry_state)
     File "/usr/local/lib/python3.13/site-packages/tenacity/__init__.py", line 
400, in <lambda>
       self._add_action_func(lambda rs: rs.outcome.result())
                                        ~~~~~~~~~~~~~~~~~^^
     File "/usr/lib64/python3.13/concurrent/futures/_base.py", line 449, in 
result
       return self.__get_result()
              ~~~~~~~~~~~~~~~~~^^
     File "/usr/lib64/python3.13/concurrent/futures/_base.py", line 401, in 
__get_result
       raise self._exception
     File "/usr/local/lib/python3.13/site-packages/airflow/utils/retries.py", 
line 106, in wrapped_function
       return func(*args, **kwargs)
     File 
"/usr/local/lib/python3.13/site-packages/airflow/jobs/scheduler_job_runner.py", 
line 1850, in _schedule_all_dag_runs
       callback_tuples = [(run, self._schedule_dag_run(run, session=session)) 
for run in dag_runs]
                                ~~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.13/site-packages/airflow/jobs/scheduler_job_runner.py", 
line 1966, in _schedule_dag_run
       schedulable_tis, callback_to_run = dag_run.update_state(session=session, 
execute_callbacks=False)
                                          
~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/usr/local/lib/python3.13/site-packages/airflow/utils/session.py", 
line 98, in wrapper
       return func(*args, **kwargs)
     File "/usr/local/lib/python3.13/site-packages/airflow/models/dagrun.py", 
line 1149, in update_state
       info = self.task_instance_scheduling_decisions(session)
     File "/usr/local/lib/python3.13/site-packages/airflow/utils/session.py", 
line 98, in wrapper
       return func(*args, **kwargs)
     File "/usr/local/lib/python3.13/site-packages/airflow/models/dagrun.py", 
line 1329, in task_instance_scheduling_decisions
       schedulable_tis, changed_tis, expansion_happened = self._get_ready_tis(
                                                          ~~~~~~~~~~~~~~~~~~~^
           schedulable_tis,
           ^^^^^^^^^^^^^^^^
           finished_tis,
           ^^^^^^^^^^^^^
           session=session,
           ^^^^^^^^^^^^^^^^
       )
       ^
     File "/usr/local/lib/python3.13/site-packages/airflow/models/dagrun.py", 
line 1519, in _get_ready_tis
       if not schedulable.are_dependencies_met(session=session, 
dep_context=dep_context):
              
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/usr/local/lib/python3.13/site-packages/airflow/utils/session.py", 
line 98, in wrapper
       return func(*args, **kwargs)
     File 
"/usr/local/lib/python3.13/site-packages/airflow/models/taskinstance.py", line 
901, in are_dependencies_met
       for dep_status in self.get_failed_dep_statuses(dep_context=dep_context, 
session=session):
                         
~~~~~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.13/site-packages/airflow/models/taskinstance.py", line 
924, in get_failed_dep_statuses
       for dep_status in dep.get_dep_statuses(self, session, dep_context):
                         ~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.13/site-packages/airflow/ti_deps/deps/base_ti_dep.py", 
line 116, in get_dep_statuses
       yield from self._get_dep_statuses(ti, session, cxt)
     File 
"/usr/local/lib/python3.13/site-packages/airflow/ti_deps/deps/not_in_retry_period_dep.py",
 line 48, in _get_dep_statuses
       next_task_retry_date = ti.next_retry_datetime()
     File 
"/usr/local/lib/python3.13/site-packages/airflow/models/taskinstance.py", line 
989, in next_retry_datetime
       if self.task.max_retry_delay:
          ^^^^^^^^^^^^^^^^^^^^^^^^^
     File "<attrs generated getattr 
airflow.models.mappedoperator.MappedOperator>", line 11, in __getattr__
       return super().__getattribute__(item)
              ~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^
   AttributeError: 'MappedOperator' object has no attribute 'max_retry_delay'. 
Did you mean: 'retry_delay'?
   ```
   
   Root cause: When [retry_exponential_backoff is 
set](https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/models/taskinstance.py#L951),
 the scheduler [looks for 
max_retry_delay](https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/models/taskinstance.py#L989).
 But this attribute is not in the [MappedOperator model 
](https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/models/mappedoperator.py)(anymore?)
   
   ### What you think should happen instead?
   
   _No response_
   
   ### How to reproduce
   
   MappedOperator re-scheduled after a first try, with 
`retry_exponential_backoff=True`
   
   ### Operating System
   
   Fedora 42
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Other 3rd-party Helm chart
   
   ### Deployment details
   
   _No response_
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [x] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to