FredericoCoelhoNunes edited a comment on issue #9975:
URL: https://github.com/apache/airflow/issues/9975#issuecomment-790692853
@zachliu Happy to hear you were able to create a stable API! Unfortunately I
am not able to: when I use Python to select a failed DAG run, and clear all its
task instances, for some reason it triggers some process in the scheduler and
it gradually sets all other DAG runs to the "Running" state as well, over the
course of a few minutes. Curiously, this doesn't happen if I clear the state of
a single run in the UI.
Here's the code I was trying to use:
```
from airflow.models.dagrun import DagRun
from airflow.models.taskinstance import clear_task_instances
from airflow import settings
session = settings.Session()
# Around 30
failed_dag_runs = DagRun.find(
dag_id=DAG_ID,
state='failed',
session=session
)
for i, dag_run in enumerate(failed_dag_runs):
run_id = dag_run.run_id
task_instances = dag_run.get_task_instances(session=session)
clear_task_instances(
task_instances,
session
)
session.commit()
state = ""
while state != 'success':
<here I was checking the DAG run state, but even before it left this
loop, it somehow clears the remaining DAG runs>
```
I think Airflow really wasn't developed with this sort of "low level" usage
in mind, as I feel that its behavior is hard to predict. I guess I will stick
with manually clearing the runs one by one until this problem is fixed.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]