Bolke, Thanks for taking the lead on this. Do you mind creating a confluence wiki page with this proposal, so that we can link email, jiras, and the signal to it. We will need a roadmap linked to our main site as well.
Sent from Sid's iPhone > On May 1, 2016, at 12:45 PM, Bolke de Bruin <[email protected]> wrote: > > Hi, > > A lot of discussion has been around scheduler issues. Some of the symptoms > are requiring the start_date to align with the (cron)interval, not being able > to update a DAG with a new interval, backfills interfering with normal dag > runs, unable to version dags. Work arounds also have been provided eg., > ignore_first_depends_on_past. Many issues on github have been created with > questions around why does the scheduler do this and not this. We can hardly > say it is working intuitively at the moment. > > The following is a proposal for an update to the scheduler while remaining > 99% backwards compatible. Code (PR-1431) is fully working and all tests pass > on travis. Now I am looking for real world testers, feedback - Paul I > understood you are also working on something on the scheduler? - etc. :-). > > SUMMARY > > Model changes > - TaskInstances now always have an associated DagRun > - TaskInstances can still be run without a real DagRun but they need the > special “-1” DagRun id. > - Backfills also create DagRuns > > Scheduler > - Scheduler is aware of backfills > - Scheduler will fast forward beyond backfill if needed while maintaining the > correct interval > - Scheduler will adjust start date for interval (no more aligning needed) > > Backfills > - Backfills can update the past, but will maintain lineage > - Backfills insert themselves in between scheduled dag runs if needed, making > depend_on_past work with arbitrary inserted backfills > > Where might there be issues: > - If you are creating TaskInstances inside DAGs yourself or in Operators you > might need to update those to take into account a dag_run_id. For Operators > have a look at the PythonOperator how to update it. > - If you are issuing “airflow run xxx” *yourself*, thus outside airflow: for > now Airflow will set your dag_run_id to -1, but is this really what you want? > Please let me know the use cases and discuss. > - For depends_on_past I check on the DagRun level, it might be required to > move this to the TaskInstance level. Not a big change and we can even support > both! > > BACKGROUND > If you consider DAGs a description of coherence work and Tasks (as defined in > Airflow) a description of work, then DagRuns are the instantiation of DAGS in > a point of time. TaskInstances are then the instantiations of Tasks in a > point of time. To maintain the coherence between TaskInstances a DagRun needs > to be aware of the TaskInstances it has. So far so good. Airflow does this > quite well and as long as you don’t try to do something fancy like updating > you interval with depend_on_past your tasks will run happily. Now enter > backfills. Backfills allow you to either create or alter history. It does > this by arbitrarily inserting tasks into the time line and completely > disregarding the scheduler. Also the scheduler does not know about backfills > leading to other issues. > > If we would maintain a time line of DagRuns this would solve the above > mentioned issues, improve lineage and pave the way for DAG versioning. It > would also simplify the code in the future by moving a lot of logic to > DagRun. While one might argue that currently “dag_id + execution_date” > already does this for both DagRun and TaskInstances this foregoes the issue > that backfills create in the scheduler, in lineage and in versioning in > addition this would not allow you to solve the moving interval easily. With > backfills you would be able to run an updated dag that changes the past, but > how to answer what version of the DAG you ran when? Depending on past in a > backfill with a new task is also quite hard. > > IMPLEMENTATION > DagRuns now maintain a “previous” property. Previous points to the previous > dag run by id, if previous is None it is considered the first DagRun. The > scheduler will set the previous property if it detects a previous run. A > previous run can be a scheduled run or a backfill run. If will adhere to the > schedule_interval, but it will fast forward beyond the latest execution date > of either the last scheduled run or last backfill run whatever comes later. > So at the moment the scheduler will not fill in the blanks for you if there > is a gap between the scheduled run and a backfill run of more than one > interval. > > Backfill will create a DagRun and insert it into the timeline, ie. it will > update the previous property of a scheduled DagRun if it is set in the future > seen from the Backfill DagRun. If it encounters a DagRun at the same > execution date for the same dag_id it will re-own the tasksinstances and set > the state of the other DagRun to “overridden”, essentially orphaning the > other DagRun but keeping its record around for auditing purposes. > > TaskInstances now maintain a reference to a DagRun, which cannot be None. If > it is created with None it will be set to the special “-1” DagRun ID. DagRun > IDs that are “None” will lead to an integrity error at the database level. > “-1” DagRun ID TaskInstances are treated as they are now. However Airflow > itself will not create such TaskInstances anymore so they come from outside. > In the context of the DAGS being the description of “coherence of work” I > have a lot of difficulty understanding having work available that is not > coherent ;-). > > > Bolke > > >
