ashb opened a new pull request #10956:
URL: https://github.com/apache/airflow/pull/10956
Depends upon #10949, ignore first commit when reviewing please.
Here it is. The "big" one. Smaller than I thought, but still a very
fundamental change!
This PR is still in draft -- there are one or two TODOs left in the code
that will need to be fixed before final merge, and while they are import, there
is still enough here to start reviewing.
- [ ] Test that my forward-port of the changes actually works again. (I
wanted to get the code up for review, so there may be some transcription errors
breaking things.)
- [ ] DAG SLAs need to happen via the parsing process
- [ ] Adding back dagrun_timeout support
- [ ] dag_run.verify_integrity is slow, and we don't want to call it every
time, just when the dag structure changes (which we can know now thanks to DAG
Serialization)
- [ ] Add a savepoint in verify_integrity (to avoid rollback killing the
whole transaction and releasing the locks.
- [ ] Produce benchmark figures against this branch, not a 4 month old
version of master branch.
This PR implements scheduler HA as proposed in AIP-15. The high level design
is as follows:
- Move all scheduling decisions into SchedulerJob (requiring DAG
serialization in the scheduler)
- Use row-level locks to ensure schedulers don't stomp on each other
(`SELECT ... FOR UPDATE`)
- Use `SKIP LOCKED` for better performance when multiple schedulers are
running. (Mysql < 8 and MariaDB don't support this)
- Scheduling decisions are not tied to the parsing speed, but can
operate just on the database
**DagFileProcessorProcess**:
Previously this component was responsible for more than just parsing the
DAG files as it's name might imply. It also was responsible for creating
DagRuns, and also making scheduling decisions of TIs, sending them from
"None" to "scheduled" state.
This commit changes it so that the DagFileProcessorProcess now will
update the SerializedDAG row for this DAG, and make no scheduling
decisions itself.
To make the scheduler's job easier (so that it can make as many
decisions as possible without having to load the possibly-large
SerializedDAG row) we store/update some columns on the DagModel table:
- `next_dagrun`: The execution_date of the next dag run that should be
created (or
None)
- `next_dagrun_create_after`: The earliest point at which the next dag
run can be created
Pre-computing these values (and updating them every time the DAG is
parsed) reduce the overall load on the DB as many decisions can be taken
by selecting just these two columns/the small DagModel row.
In case of max_active_runs, or `@once` these columns will be set to
null, meaning "don't create any dag runs"
**SchedulerJob**
The SchedulerJob used to only queue/send tasks to the executor after
they were parsed, and returned from the DagFileProcessorProcess.
This PR breaks the link between parsing and enqueuing of tasks, instead
of looking at DAGs as they are parsed, we now:
- store a new datetime column, `last_scheduling_decision` on DagRun
table, signifying when a scheduler last examined a DagRun
- Each time around the loop the scheduler will get (and lock) the next
_n_ DagRuns via `DagRun.next_dagruns_to_examine`, prioritising DagRuns
which haven't been touched by a scheduler in the longest period
- SimpleTaskInstance etc have been almost entirely removed now, as we
use the serialized versions
---
**^ Add meaningful description above**
Read the **[Pull Request
Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines)**
for more information.
In case of fundamental code change, Airflow Improvement Proposal
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals))
is needed.
In case of a new dependency, check compliance with the [ASF 3rd Party
License Policy](https://www.apache.org/legal/resolved.html#category-x).
In case of backwards incompatible changes please leave a note in
[UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]