mikemoto opened a new pull request #19706:
URL: https://github.com/apache/airflow/pull/19706


   Let see this case:
    1. i use sequential executor, and use sqlite as db
    2. the dag has two task(as the pic shown below), and there is NO dependeny 
between them
   
![image](https://user-images.githubusercontent.com/4498225/142616982-aef93cbc-312f-4e34-abfd-50ee14b4b7aa.png)
   
   Then, whent i start a dag run, both task will be added to sequential 
executor in the same schduler loop.
   
   When i stop the dagrun when the first taskinstance(let's assume the is the 
[sleep_1] step) is running, using set_dag_run_state_to_failed function from 
airflow.api.common.experimental.mark_tasks. I can see the state of dagrun will 
be changed into FAILED, and the state of taskinstance of task [sleep_1] will 
also changed into FAILED, but the taskinstance of [sleep_2] will be left 
unchanged(queued)
   
   Then, something weired happend, the task instance of [sleep_2] will be 
executed as usual. That's not what i want!
   
   I check the code, and find the sequential executor will still run all the 
taskintance in commane_to_run list, even if some taskinstance of the same 
dagrun has already failed. 
   
   And, even when running  [airflow task run] command, the 
check_and_change_state_before_execution function in model/taskinstance.py will 
not check whether the dagrun of the taskinstance has failed or not.
   
   So i add the add DagrunRunningDep into RUNNING_DEPS, and things go right 
again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to