ashb opened a new pull request #10956:
URL: https://github.com/apache/airflow/pull/10956


   Depends upon #10949, ignore first commit when reviewing please.
   
   Here it is. The "big" one. Smaller than I thought, but still a very 
fundamental change!
   
   This PR is still in draft -- there are one or two TODOs left in the code 
that will need to be fixed before final merge, and while they are import, there 
is still enough here to start reviewing.
   
   - [ ] Test that my forward-port of the changes actually works again. (I 
wanted to get the code up for review, so there may be some transcription errors 
breaking things.)
   - [ ] DAG SLAs need to happen via the parsing process
   - [ ] Adding back dagrun_timeout support
   - [ ] dag_run.verify_integrity is slow, and we don't want to call it every 
time, just when the dag structure changes (which we can know now thanks to DAG 
Serialization)
   - [ ] Add a savepoint in verify_integrity (to avoid rollback killing the 
whole transaction and releasing the locks. 
   - [ ] Produce benchmark figures against this branch, not a 4 month old 
version of master branch.
   
   This PR implements scheduler HA as proposed in AIP-15. The high level design 
is as follows:
   
   - Move all scheduling decisions into SchedulerJob (requiring DAG 
serialization in the scheduler)
   - Use row-level locks to ensure schedulers don't stomp on each other
     (`SELECT ... FOR UPDATE`)
   - Use `SKIP LOCKED` for better performance when multiple schedulers are
     running. (Mysql < 8 and MariaDB don't support this)
   - Scheduling decisions are not tied to the parsing speed, but can
     operate just on the database
   
   **DagFileProcessorProcess**:
   
   Previously this component was responsible for more than just parsing the
   DAG files as it's name might imply. It also was responsible for creating
   DagRuns, and also making scheduling decisions of TIs, sending them from
   "None" to "scheduled" state.
   
   This commit changes it so that the DagFileProcessorProcess now will
   update the SerializedDAG row for this DAG, and make no scheduling
   decisions itself.
   
   To make the scheduler's job easier (so that it can make as many
   decisions as possible without having to load the possibly-large
   SerializedDAG row) we store/update some columns on the DagModel table:
   
   - `next_dagrun`: The execution_date of the next dag run that should be 
created (or
     None)
   - `next_dagrun_create_after`: The earliest point at which the next dag
     run can be created
   
   Pre-computing these values (and updating them every time the DAG is
   parsed) reduce the overall load on the DB as many decisions can be taken
   by selecting just these two columns/the small DagModel row.
   
   In case of max_active_runs, or `@once` these columns will be set to
   null, meaning "don't create any dag runs"
   
   **SchedulerJob**
   
   The SchedulerJob used to only queue/send tasks to the executor after
   they were parsed, and returned from the DagFileProcessorProcess.
   
   This PR breaks the link between parsing and enqueuing of tasks, instead
   of looking at DAGs as they are parsed, we now:
   
   -  store a new datetime column, `last_scheduling_decision` on DagRun
     table, signifying when a scheduler last examined a DagRun
   - Each time around the loop the scheduler will get (and lock) the next
     _n_ DagRuns via `DagRun.next_dagruns_to_examine`, prioritising DagRuns
     which haven't been touched by a scheduler in the longest period
   - SimpleTaskInstance etc have been almost entirely removed now, as we
     use the serialized versions
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request 
Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines)**
 for more information.
   In case of fundamental code change, Airflow Improvement Proposal 
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals))
 is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party 
License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in 
[UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to