SamWheating opened a new issue #21908:
URL: https://github.com/apache/airflow/issues/21908


   ### Apache Airflow version
   
   2.2.4 (latest released)
   
   ### What happened
   
   I was running a large DAG with a limited concurrency and wanted to cancel 
the current run. I marked the run as `Failed` via the UI which terminated all 
running tasks and marked them as `Failed`. 
   
   However, a few seconds later the run was set back to Running and other tasks 
started to execute.
   
   <img width="1235" alt="image" 
src="https://user-images.githubusercontent.com/16950874/156228662-e06dd71a-e8ef-4cdd-b958-5ddefa1d5328.png";>
   
   
   I think that this is because of two things happening:
   
    1) Marking a run as failed will only stop the currently running tasks and 
mark them as failed, does nothing to tasks in `scheduled` state:
    
https://github.com/apache/airflow/blob/0cd3b11f3a5c406fbbd4433d8e44d326086db634/airflow/api/common/mark_tasks.py#L455-L462
    
    2) During scheduling, a DAG with tasks in non-finished states will be 
marked as `Running`:
    
    
https://github.com/apache/airflow/blob/feea143af9b1db3b1f8cd8d29677f0b2b2ab757a/airflow/models/dagrun.py#L583-L585
   
   I'm assuming that this is unintended behaviour, is that correct?
   
   ### What you expected to happen
   
   I think that marking a DAG as failed should cause the run to stop (and not 
be resumed) regardless of the state of its tasks.
   
   When a DAGRun is marked failed, we should:
    - mark all running tasks failed
    - **mark all non-finished tasks as skipped**
    - mark the DagRun as `failed`
    
   This is consistent with the behaviour from a DagRun time out.
   
   ### How to reproduce
   
   Run this DAG:
   
   ```python
   from datetime import timedelta
   from airflow.models import DAG
   from airflow.operators.bash_operator import BashOperator
   from airflow import utils
   
   dag = DAG(
       'cant-be-stopped',
       start_date=utils.dates.days_ago(1),
       max_active_runs=1,
       dagrun_timeout=timedelta(minutes=60),
       schedule_interval=None,
       concurrency=1
   )
   
   for i in range(5):
       task = BashOperator(
           task_id=f'task_{i}',
           bash_command='sleep 300',
           retries=0,
           dag=dag,
       )
   ```
   
   And once the first task is running, mark the run as failed. After the next 
scheduler loop the run will be set back to running and a different task will be 
started. 
   
   ### Operating System
   
   Debian GNU/Linux 10 (buster)
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Other 3rd-party Helm chart
   
   ### Deployment details
   
   Noticed this in Airflow 2.2.2 but replicated in a Breeze environment on the 
latest main.
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to