nicklip opened a new issue, #30228:
URL: https://github.com/apache/airflow/issues/30228
### Apache Airflow version
2.5.2
### What happened
The callback function called the `clear_task_instances` function and it
failed to clear the task that was passed to it
### What you think should happen instead
`clear_task_instances` should have cleared the task passed to it but was
unable to. Instead the state of that task was set to 'failed' and never cleared
or restarted.
### How to reproduce
Run the following DAG and see that `failing_task` only fails and never gets
cleared or restarted / retried.
Look at the XCOM list and you will see that the task object itself was in
`running` state before the `clear_task_instances` call and `restarting` state
after the call. However, when monitoring the `task_instance` table in the
database while running this task, the task's state only goes from `running` to
`failed` and never actually gets set to `restarting`. In version 2.3.1, running
this code works as expected and the task state in the `task_instance` table
goes from `running` to `failed` to `null`, successfully clearing it.
`@provide_session`
`def clear_tasks_fn(tis, session=NEW_SESSION, dag=None):`
sleep_time = random.uniform(30.0, 60.0) # sleep between 30 seconds and
1 minute
sleep(sleep_time)
for ti in tis:
ti.xcom_push(key="before_state", value=ti.state)
clear_task_instances(tis=tis, session=session, dag=dag)
for ti in tis:
session.commit()
ti.xcom_push(key="after_state", value=ti.state)
`def clear_tasks_callback(context):`
all_tasks = context["dag_run"].get_task_instances()
dag_inner = context["dag"]
task_ids_to_clear = context["params"].get("task_ids_to_clear", [])
tasks_to_clear = [ti for ti in all_tasks if ti.task_id in
task_ids_to_clear]
clear_tasks_fn(tasks_to_clear, dag=dag_inner)
`def failing_task_fn():`
raise Exception("Forcing task to fail")
`def succeeding_task_fn():`
sleep_time = random.uniform(10.0, 20.0) # sleep between 10 and 20
seconds
sleep(sleep_time)
return
`default_args = {"owner": "revenge",
"start_date": datetime(2022, 10, 1),
}`
`schedule = "@once"`
`the_dag = DAG("clear_task_test", schedule_interval=schedule,
default_args=default_args,
catchup=False, is_paused_upon_creation=True)`
`with the_dag:`
failing_task_t = PythonOperator(
task_id="failing_task",
python_callable=failing_task_fn,
params={"task_ids_to_clear": ["failing_task"]},
on_failure_callback=clear_tasks_callback
)
succeeding_task_t = PythonOperator(
task_id="succeeding_task",
python_callable=succeeding_task_fn,
)
`failing_task_t.set_downstream(succeeding_task_t)`
### Operating System
Debian GNU/Linux 11 (bullseye)
### Versions of Apache Airflow Providers
_No response_
### Deployment
Docker-Compose
### Deployment details
_No response_
### Anything else
_No response_
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]