tirkarthi opened a new issue, #56704:
URL: https://github.com/apache/airflow/issues/56704

   ### Apache Airflow version
   
   main (development)
   
   ### If "Other Airflow 2/3 version" selected, which one?
   
   _No response_
   
   ### What happened?
   
   When a task resumes from deferred state and goes to the method mentioned in 
`method_name` to raise an exception the retries also use `next_method` and 
`next_kwargs` to fail again. During the task retry and other terminal states 
the values should be cleared. Something like the sample patch below
   
   
https://github.com/apache/airflow/blob/c28b21178b9c8c299a8ec5c74eff39d7e1da99b9/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py#L436-L453
   
   Sample patch :
   
   ```diff
   diff --git 
a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py 
b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
   index f2f8fc8e11..67af09eeb3 100644
   --- 
a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
   +++ 
b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
   @@ -437,7 +437,7 @@ def _create_ti_state_update_query_and_update_state(
            ti = session.get(TI, ti_id_str)
            updated_state = ti_patch_payload.state
            query = TI.duration_expression_update(ti_patch_payload.end_date, 
query, session.bind)
   -        query = query.values(state=updated_state)
   +        query = query.values(state=updated_state, next_method=None, 
next_kwargs=None)
    
            if updated_state == TerminalTIState.FAILED:
                # This is the only case needs extra handling for 
TITerminalStatePayload
   
   ```
   
   ### What you think should happen instead?
   
   _No response_
   
   ### How to reproduce
   
   1. Run the below dag.
   2. Create `/tmp/a` for the trigger to succeed.
   3. Wait for the dag failure due to exception from `complete_file_sensor` and 
remove `/tmp/a`.
   4. Retry doesn't defer the task and fails instantly with the `next_kwargs` 
and `next_method` being persisted.
   
   ```python
   from __future__ import annotations
   
   from datetime import datetime, timedelta
   
   from airflow.exceptions import AirflowException
   from airflow.models.baseoperator import BaseOperator
   from airflow.providers.standard.triggers.file import FileTrigger
   from airflow.sdk import DAG
   
   
   class FileSensor(BaseOperator):
       def __init__(self, *, filepath, **kwargs):
           super().__init__(**kwargs)
           self.filepath = filepath
   
       def execute(self, context):
           self.defer(trigger=FileTrigger(filepath=self.filepath), 
method_name="complete_file_sensor")
   
       def complete_file_sensor(self, context, event=None):
           raise AirflowException(event)
   
   
   with DAG(
       "retry_defer_next_method",
       start_date=datetime(2025, 3, 1, 0, 0, 0),
       schedule="@daily",
       catchup=False,
       default_args={"retries": 4, "retry_delay": timedelta(seconds=10)},
   ) as dag:
       file_task = FileSensor(task_id="file_task", filepath="/tmp/a")
       file_task
   ```
   
   ### Operating System
   
   Ubuntu 20.04
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Virtualenv installation
   
   ### Deployment details
   
   _No response_
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [x] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to