Hi,

Working on the roadmap of the scheduler I have prepared a new PR 
(https://github.com/apache/incubator-airflow/pull/1514 
<https://github.com/apache/incubator-airflow/pull/1514>) that is ready for 
review.

Goals:

* Improve readability of the code and generic assumptions (getters should not 
change a state) for DagRuns
* Improve robustness and lower risk of race conditions in the scheduler
* Reduce amount of calls to the database, limit connections in the scheduler
* Identify speed optimizations possibilities

What has changed:
* Two new TaskInstance states have been introduced. "REMOVED" and "SCHEDULED". 
REMOVED will be set when taskinstances are encountered that do no exist anymore 
in the DAG. This happens when a DAG is changed (ie. a new version). The 
"REMOVED" state exists for lineage purposes. "SCHEDULED" is used when a Task 
that did not have a state before is sent to the executor. It is used by both 
the scheduler and backfills. This state almost removes the race condition that 
exists if using multiple schedulers: due to the fact UP_FOR_RETRY is being 
managed by the TaskInstance (I think that is the wrong place) is still exists 
for that state.
* get_active_runs was a getter that was also updating to the database. This 
patch refactors get_active_runs into two different functions that are now part 
of DagRun. 1) update_state updates the state of the dagrun based on the 
taskinstances of the dagrun. 2) verify_integrity checks and updates the dag run 
based on if the dag contains new or missing tasks. 
* DagRun.update_state has been updated to not call the database twice for the 
same functions. This reduces the time spent here by 50% in certain occasions 
when having many tasks in a Dag that need to be evaluated. Still this needs to 
be faster: for those Dags with many tasks the aggregation query in 
TaskInstance.are_dependencies_met is very expensive. It should be refactored.
* process_dag has been updated to use the functions and cleaned up, making it 
much more readable. Tasks are now properly locked by the database. I have 
played with multiprocessing here (on dagruns and taskinstances) but left it out 
for now. Fixing the above will help more I think.

More info in the PR itself.

please let me know what you think!

- Bolke

Reply via email to