ydodeja365 opened a new pull request #17961:
URL: https://github.com/apache/airflow/pull/17961


   ## Issue:
   Clear task instances endpoint can clear all DAG runs of all DAGs when no 
task instances match the parameters given in the request body and 
reset_dag_runs is true.
   All DAG runs lose information of their start_date and end_date because of 
this and it also causes delay to launch new instances until the scheduler 
re-marks all DAG runs to failed/success when max_active_dag_runs is a low 
number.
   
   ## Cause:
   
   airflow/api_connexion/endpoints/task_instance_endpoint.py: 
post_clear_task_instances function
   ```python
   task_instances = dag.clear(dry_run=True, dag_bag=current_app.dag_bag, **data)
   if not dry_run:
       clear_task_instances(
           task_instances, session, dag=dag, dag_run_state=State.RUNNING if 
reset_dag_runs else False
       )
   ```
   `dag.clear()` returns a SQLAlchemy Query object. If no task instances match 
the request body, an empty query object is stored in task_instances. This 
object is passed to clear_task_instances that expects a list and not a query 
object.
   
   In airflow/models/taskinstance.py: clear_task_instances function:
   ```python
       if dag_run_state is not False and tis:
           from airflow.models.dagrun import DagRun  # Avoid circular import
   
           dates_by_dag_id = defaultdict(set)
           for instance in tis:
               dates_by_dag_id[instance.dag_id].add(instance.execution_date)
   
           drs = (
               session.query(DagRun)
               .filter(
                   or_(
                       and_(DagRun.dag_id == dag_id, 
DagRun.execution_date.in_(dates))
                       for dag_id, dates in dates_by_dag_id.items()
                   )
               )
               .all()
           )
   ```
   Since tis is an empty Query object and not an empty list, [], the if 
condition is satisfied and drs is initialized which sets all DAG runs to 
running state. The snippet inside `or_()` evaluates to `None` as there are zero 
ti objects in tis. So `drs = (session.query(DagRun).all())`.
   
   ## Fix:
   Passing `task_instances.all()` to `clear_task_instances()` will pass an 
empty list and restrict the if block from running. Tested in local breeze 
environment and this fixes the issue.
   
   ## An Example:
   
   ### Before Clear API was called:
   #### A sample DAG tutorial and its runs:
   
![before_clear_dag](https://user-images.githubusercontent.com/22529841/131664830-fb134ad2-1d3d-4214-99fd-bcbcd10fe392.PNG)
   #### Some example dags runs:
   
![before_clear_dags](https://user-images.githubusercontent.com/22529841/131664833-60fd3b81-5a10-4d57-8881-762926c01b8b.PNG)
   
   ### Clear API Request Sent:
   POST URL: http://localhost:28080/api/v1/dags/tutorial/clearTaskInstances (on 
Breeze env)
   Body: `reset_dag_runs` is `true` and `start_date` is a future date and hence 
no task instances will match the filters. 
   ```json
   {
       "dry_run": false,
       "start_date": "2021-10-30T00:00:00+00:00",
       "task_ids": ["print_date"],
       "only_failed": false,
       "reset_dag_runs": true
   }
   ```
   
   ### After Clear API was called:
   #### DAG tutorial with all its runs cleared:
   
![after_clear_dags](https://user-images.githubusercontent.com/22529841/131664821-26068498-7c72-4886-b1a1-5df3a9f8872e.PNG)
   #### Example DAGs:
   
![after_clear_dag](https://user-images.githubusercontent.com/22529841/131664837-653f4ba3-1313-487b-842c-c2eef6112cd0.PNG)
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to