tirkarthi opened a new issue, #56704: URL: https://github.com/apache/airflow/issues/56704
### Apache Airflow version main (development) ### If "Other Airflow 2/3 version" selected, which one? _No response_ ### What happened? When a task resumes from deferred state and goes to the method mentioned in `method_name` to raise an exception the retries also use `next_method` and `next_kwargs` to fail again. During the task retry and other terminal states the values should be cleared. Something like the sample patch below https://github.com/apache/airflow/blob/c28b21178b9c8c299a8ec5c74eff39d7e1da99b9/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py#L436-L453 Sample patch : ```diff diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py index f2f8fc8e11..67af09eeb3 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py @@ -437,7 +437,7 @@ def _create_ti_state_update_query_and_update_state( ti = session.get(TI, ti_id_str) updated_state = ti_patch_payload.state query = TI.duration_expression_update(ti_patch_payload.end_date, query, session.bind) - query = query.values(state=updated_state) + query = query.values(state=updated_state, next_method=None, next_kwargs=None) if updated_state == TerminalTIState.FAILED: # This is the only case needs extra handling for TITerminalStatePayload ``` ### What you think should happen instead? _No response_ ### How to reproduce 1. Run the below dag. 2. Create `/tmp/a` for the trigger to succeed. 3. Wait for the dag failure due to exception from `complete_file_sensor` and remove `/tmp/a`. 4. Retry doesn't defer the task and fails instantly with the `next_kwargs` and `next_method` being persisted. ```python from __future__ import annotations from datetime import datetime, timedelta from airflow.exceptions import AirflowException from airflow.models.baseoperator import BaseOperator from airflow.providers.standard.triggers.file import FileTrigger from airflow.sdk import DAG class FileSensor(BaseOperator): def __init__(self, *, filepath, **kwargs): super().__init__(**kwargs) self.filepath = filepath def execute(self, context): self.defer(trigger=FileTrigger(filepath=self.filepath), method_name="complete_file_sensor") def complete_file_sensor(self, context, event=None): raise AirflowException(event) with DAG( "retry_defer_next_method", start_date=datetime(2025, 3, 1, 0, 0, 0), schedule="@daily", catchup=False, default_args={"retries": 4, "retry_delay": timedelta(seconds=10)}, ) as dag: file_task = FileSensor(task_id="file_task", filepath="/tmp/a") file_task ``` ### Operating System Ubuntu 20.04 ### Versions of Apache Airflow Providers _No response_ ### Deployment Virtualenv installation ### Deployment details _No response_ ### Anything else? _No response_ ### Are you willing to submit PR? - [x] Yes I am willing to submit a PR! ### Code of Conduct - [x] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
