sydykov opened a new issue, #27615: URL: https://github.com/apache/airflow/issues/27615
### Apache Airflow version 2.4.2 ### What happened Upon the refactor that happened in v.2.4, the logic in `handle_failure()` in `taskinstance.py` was changed, and now a task's state is updated (the corresponding session is flushed) after that task's on-failure, on-retry callbacks are called. As a result, these callbacks, effectively, cannot change the task's state as the state is controlled by the session that is flushed after the callbacks, i.e. if a callback wants to clear that task's state, the state will later still be set to `State.FAILED` or `State.UP_FOR_RETRY` when the session is flushed at the end of `handle_failure()`. Summary: 1. Since v.2.4, callbacks the behavior of which involved clearing that task's state no longer work as intended as the altered state is overwritten afterwards. 2. When the callbacks are called, the task's state in the database is outdated, e.g. upon an on-failure callback, the involved task's state is still `State.RUNNING` (with the new, default session used). --- The behavior is controlled by the following code: https://github.com/apache/airflow/blob/febf35500d5de172e25280e5f5492257f898fdf5/airflow/models/taskinstance.py#L1916-L1921 This is what it is used to be like (if conveyed via very short snippets from v.2.3.4): 1. The session was flushed inside `handle_failure()`: https://github.com/apache/airflow/blob/88b274c95b212b541ba19918880ae425856212be/airflow/models/taskinstance.py#L1959-L1961 3. The callbacks were called after that session has been flushed: https://github.com/apache/airflow/blob/88b274c95b212b541ba19918880ae425856212be/airflow/models/taskinstance.py#L1971-L1972 ### What you think should happen instead The blocks here https://github.com/apache/airflow/blob/febf35500d5de172e25280e5f5492257f898fdf5/airflow/models/taskinstance.py#L1916-L1921 should be swapped so that the code looks like this: ``` if not test_mode: session.merge(self) session.flush() if callback and context: self._run_finished_callback(callback, context, callback_type) ``` ### How to reproduce Create a simple on-failure callback that clears the task's state like so: ``` from airflow.models import DagRun, TaskInstance def our_simple_callback(context: dict) -> None: logical_date = context["logical_date"] ti: TaskInstance = context["ti"] dag_run: DagRun = context["dag_run"] logging.info(f"The current state of the task '{ti.task_id}' in the database is '{ti.current_state()}'.") dag_run.get_dag().task_dict[ti.task_id].clear(start_date=logical_date, end_date=logical_date, downstream=True) ``` Use it for any task of your choice that should fail and observe that the task won't be started again after the callback. Also notice the state of the task (in the database) via the log message. ### Operating System I am using macOS Monterey 12.6.1, and the container is built via the slim Airflow base. ### Versions of Apache Airflow Providers Irrelevant. ### Deployment Official Apache Airflow Helm Chart ### Deployment details _No response_ ### Anything else _No response_ ### Are you willing to submit PR? - [X] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
