nicklip opened a new issue, #30228:
URL: https://github.com/apache/airflow/issues/30228

   ### Apache Airflow version
   
   2.5.2
   
   ### What happened
   
   The callback function called the `clear_task_instances` function and it 
failed to clear the task that was passed to it
   
   ### What you think should happen instead
   
   `clear_task_instances` should have cleared the task passed to it but was 
unable to. Instead the state of that task was set to 'failed' and never cleared 
or restarted. 
   
   ### How to reproduce
   
   Run the following DAG and see that `failing_task` only fails and never gets 
cleared or restarted / retried. 
   Look at the XCOM list and you will see that the task object itself was in 
`running` state before the `clear_task_instances` call and `restarting` state 
after the call. However, when monitoring the `task_instance` table in the 
database while running this task, the task's state only goes from `running` to 
`failed` and never actually gets set to `restarting`. In version 2.3.1, running 
this code works as expected and the task state in the `task_instance` table 
goes from `running` to `failed` to `null`, successfully clearing it.
   
   `@provide_session`
   `def clear_tasks_fn(tis, session=NEW_SESSION, dag=None):`
   
       sleep_time = random.uniform(30.0, 60.0)  # sleep between 30 seconds and 
1 minute
       sleep(sleep_time)
       for ti in tis:
           ti.xcom_push(key="before_state", value=ti.state)
       clear_task_instances(tis=tis, session=session, dag=dag)
       for ti in tis:
           session.commit()
           ti.xcom_push(key="after_state", value=ti.state)
   
   
   `def clear_tasks_callback(context):`
   
       all_tasks = context["dag_run"].get_task_instances()
       dag_inner = context["dag"]
       task_ids_to_clear = context["params"].get("task_ids_to_clear", [])
   
       tasks_to_clear = [ti for ti in all_tasks if ti.task_id in 
task_ids_to_clear]
   
       clear_tasks_fn(tasks_to_clear, dag=dag_inner)
   
   
   `def failing_task_fn():`
   
       raise Exception("Forcing task to fail")
   
   
   `def succeeding_task_fn():`
   
       sleep_time = random.uniform(10.0, 20.0)  # sleep between 10 and 20 
seconds
       sleep(sleep_time)
       return
   
   
   `default_args = {"owner": "revenge",
                   "start_date": datetime(2022, 10, 1),
                   }`
   `schedule = "@once"`
   
   `the_dag = DAG("clear_task_test", schedule_interval=schedule, 
default_args=default_args,
                  catchup=False, is_paused_upon_creation=True)`
   
   `with the_dag:`
   
       failing_task_t = PythonOperator(
           task_id="failing_task",
           python_callable=failing_task_fn,
           params={"task_ids_to_clear": ["failing_task"]},
           on_failure_callback=clear_tasks_callback
       )
   
       succeeding_task_t = PythonOperator(
           task_id="succeeding_task",
           python_callable=succeeding_task_fn,
       )
   
   `failing_task_t.set_downstream(succeeding_task_t)`
   
   ### Operating System
   
   Debian GNU/Linux 11 (bullseye)
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Docker-Compose
   
   ### Deployment details
   
   _No response_
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to