sydykov opened a new issue, #27615:
URL: https://github.com/apache/airflow/issues/27615

   ### Apache Airflow version
   
   2.4.2
   
   ### What happened
   
   Upon the refactor that happened in v.2.4, the logic in `handle_failure()` in 
`taskinstance.py` was changed, and now a task's state is updated (the 
corresponding session is flushed) after that task's on-failure, on-retry 
callbacks are called. As a result, these callbacks, effectively, cannot change 
the task's state as the state is controlled by the session that is flushed 
after the callbacks, i.e. if a callback wants to clear that task's state, the 
state will later still be set to `State.FAILED` or `State.UP_FOR_RETRY` when 
the session is flushed at the end of `handle_failure()`.
   
   Summary:
   1. Since v.2.4, callbacks the behavior of which involved clearing that 
task's state no longer work as intended as the altered state is overwritten 
afterwards.
   2. When the callbacks are called, the task's state in the database is 
outdated, e.g. upon an on-failure callback, the involved task's state is still 
`State.RUNNING` (with the new, default session used).
   
   ---
   The behavior is controlled by the following code:
   
https://github.com/apache/airflow/blob/febf35500d5de172e25280e5f5492257f898fdf5/airflow/models/taskinstance.py#L1916-L1921
   
   This is what it is used to be like (if conveyed via very short snippets from 
v.2.3.4):
   1. The session was flushed inside `handle_failure()`:
   
https://github.com/apache/airflow/blob/88b274c95b212b541ba19918880ae425856212be/airflow/models/taskinstance.py#L1959-L1961
   3. The callbacks were called after that session has been flushed: 
https://github.com/apache/airflow/blob/88b274c95b212b541ba19918880ae425856212be/airflow/models/taskinstance.py#L1971-L1972
   
   ### What you think should happen instead
   
   The blocks here
   
https://github.com/apache/airflow/blob/febf35500d5de172e25280e5f5492257f898fdf5/airflow/models/taskinstance.py#L1916-L1921
   should be swapped so that the code looks like this:
   ```
    if not test_mode: 
        session.merge(self) 
        session.flush() 
   
    if callback and context: 
        self._run_finished_callback(callback, context, callback_type) 
   ```
   
   ### How to reproduce
   
   Create a simple on-failure callback that clears the task's state like so:
   
   ```
   from airflow.models import DagRun, TaskInstance
   
   def our_simple_callback(context: dict) -> None:
         logical_date = context["logical_date"]
         ti: TaskInstance = context["ti"]
         dag_run: DagRun = context["dag_run"]
         logging.info(f"The current state of the task '{ti.task_id}' in the 
database is '{ti.current_state()}'.")
         dag_run.get_dag().task_dict[ti.task_id].clear(start_date=logical_date, 
end_date=logical_date, downstream=True)
   ```
   
   Use it for any task of your choice that should fail and observe that the 
task won't be started again after the callback. Also notice the state of the 
task (in the database) via the log message.
   
   ### Operating System
   
   I am using macOS Monterey 12.6.1, and the container is built via the slim 
Airflow base.
   
   ### Versions of Apache Airflow Providers
   
   Irrelevant.
   
   ### Deployment
   
   Official Apache Airflow Helm Chart
   
   ### Deployment details
   
   _No response_
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to