mikemoto opened a new pull request #19706:
URL: https://github.com/apache/airflow/pull/19706
Let see this case:
1. i use sequential executor, and use sqlite as db
2. the dag has two task(as the pic shown below), and there is NO dependeny
between them

Then, whent i start a dag run, both task will be added to sequential
executor in the same schduler loop.
When i stop the dagrun when the first taskinstance(let's assume the is the
[sleep_1] step) is running, using set_dag_run_state_to_failed function from
airflow.api.common.experimental.mark_tasks. I can see the state of dagrun will
be changed into FAILED, and the state of taskinstance of task [sleep_1] will
also changed into FAILED, but the taskinstance of [sleep_2] will be left
unchanged(queued)
Then, something weired happend, the task instance of [sleep_2] will be
executed as usual. That's not what i want!
I check the code, and find the sequential executor will still run all the
taskintance in commane_to_run list, even if some taskinstance of the same
dagrun has already failed.
And, even when running [airflow task run] command, the
check_and_change_state_before_execution function in model/taskinstance.py will
not check whether the dagrun of the taskinstance has failed or not.
So i add the add DagrunRunningDep into RUNNING_DEPS, and things go right
again.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]