FredericoCoelhoNunes edited a comment on issue #9975:
URL: https://github.com/apache/airflow/issues/9975#issuecomment-790692853


   @zachliu Happy to hear you were able to create a stable API! Unfortunately I 
am not able to: when I use Python to select a failed DAG run, and clear all its 
task instances, for some reason it triggers some process in the scheduler and 
it gradually sets all other DAG runs to the "Running" state as well, over the 
course of a few minutes. Curiously, this doesn't happen if I clear the state of 
a single run in the UI.
   
   Here's the code I was trying to use:
   ```
   from airflow.models.dagrun import DagRun
   from airflow.models.taskinstance import clear_task_instances
   from airflow import settings
   
   session = settings.Session()
   
   # Around 30
   failed_dag_runs = DagRun.find(
       dag_id=DAG_ID,
       state='failed',
       session=session
   )
   
   for i, dag_run in enumerate(failed_dag_runs):
       run_id = dag_run.run_id
       task_instances = dag_run.get_task_instances(session=session)
       clear_task_instances(
           task_instances,
           session
       )
       session.commit()
       
       state = ""
       while state != 'success':
           <here I was checking the DAG run state, but even before it left this 
loop, it somehow clears the remaining DAG runs>
   ```
   
   I think Airflow really wasn't developed with this sort of "low level" usage 
in mind, as I feel that its behavior is hard to predict. I guess I will stick 
with manually clearing the runs one by one until this problem is fixed.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to