Bolke,
Thanks for taking the lead on this. Do you mind creating a confluence wiki page 
with this proposal, so that we can link email, jiras, and the signal to it. We 
will need a roadmap linked to our main site as well.

Sent from Sid's iPhone 

> On May 1, 2016, at 12:45 PM, Bolke de Bruin <[email protected]> wrote:
> 
> Hi,
> 
> A lot of discussion has been around scheduler issues. Some of the symptoms 
> are requiring the start_date to align with the (cron)interval, not being able 
> to update a DAG with a new interval, backfills interfering with normal dag 
> runs, unable to version dags. Work arounds also have been provided eg., 
> ignore_first_depends_on_past. Many issues on github have been created with 
> questions around why does the scheduler do this and not this. We can hardly 
> say it is working intuitively at the moment.
> 
> The following is a proposal for an update to the scheduler while remaining 
> 99% backwards compatible. Code (PR-1431) is fully working and all tests pass 
> on travis. Now I am looking for real world testers, feedback - Paul I 
> understood you are also working on something on the scheduler? - etc. :-).
> 
> SUMMARY
> 
> Model changes
> - TaskInstances now always have an associated DagRun
> - TaskInstances can still be run without a real DagRun but they need the 
> special “-1” DagRun id.
> - Backfills also create DagRuns
> 
> Scheduler
> - Scheduler is aware of backfills
> - Scheduler will fast forward beyond backfill if needed while maintaining the 
> correct interval
> - Scheduler will adjust start date for interval (no more aligning needed)
> 
> Backfills
> - Backfills can update the past, but will maintain lineage
> - Backfills insert themselves in between scheduled dag runs if needed, making 
> depend_on_past work with arbitrary inserted backfills
> 
> Where might there be issues:
> - If you are creating TaskInstances inside DAGs yourself or in Operators you 
> might need to update those to take into account a dag_run_id. For Operators 
> have a look at the PythonOperator how to update it.
> - If you are issuing “airflow run xxx” *yourself*, thus outside airflow: for 
> now Airflow will set your dag_run_id to -1, but is this really what you want? 
> Please let me know the use cases and discuss.
> - For depends_on_past I check on the DagRun level, it might be required to 
> move this to the TaskInstance level. Not a big change and we can even support 
> both!
> 
> BACKGROUND
> If you consider DAGs a description of coherence work and Tasks (as defined in 
> Airflow) a description of work, then DagRuns are the instantiation of DAGS in 
> a point of time. TaskInstances are then the instantiations of Tasks in a 
> point of time. To maintain the coherence between TaskInstances a DagRun needs 
> to be aware of the TaskInstances it has. So far so good. Airflow does this 
> quite well and as long as you don’t try to do something fancy like updating 
> you interval with depend_on_past your tasks will run happily. Now enter 
> backfills. Backfills allow you to either create or alter history. It does 
> this by arbitrarily inserting tasks into the time line and completely 
> disregarding the scheduler. Also the scheduler does not know about backfills 
> leading to other issues.
> 
> If we would maintain a time line of DagRuns this would solve the above 
> mentioned issues, improve lineage and pave the way for DAG versioning. It 
> would also simplify the code in the future by moving a lot of logic to 
> DagRun. While one might argue that currently  “dag_id + execution_date” 
> already does this for both DagRun and TaskInstances this foregoes the issue 
> that backfills create in the scheduler, in lineage and in versioning in 
> addition this would not allow you to solve the moving interval easily. With 
> backfills you would be able to run an updated dag that changes the past, but 
> how to answer what version of the DAG you ran when? Depending on past in a 
> backfill with a new task is also quite hard.
> 
> IMPLEMENTATION
> DagRuns now maintain a “previous” property. Previous points to the previous 
> dag run by id, if previous is None it is considered the first DagRun. The 
> scheduler will set the previous property if it detects a previous run. A 
> previous run can be a scheduled run or a backfill run. If will adhere to the 
> schedule_interval, but it will fast forward beyond the latest execution date 
> of either the last scheduled run or last backfill run whatever comes later. 
> So at the moment the scheduler will not fill in the blanks for you if there 
> is a gap between the scheduled run and a backfill run of more than one 
> interval.
> 
> Backfill will create a DagRun and insert it into the timeline, ie. it will 
> update the previous property of a scheduled DagRun if it is set in the future 
> seen from the Backfill DagRun. If it encounters a DagRun at the same 
> execution date for the same dag_id it will re-own the tasksinstances and set 
> the state of the other DagRun to “overridden”, essentially orphaning the 
> other DagRun but keeping its record around for auditing purposes.
> 
> TaskInstances now maintain a reference to a DagRun, which cannot be None. If 
> it is created with None it will be set to the special “-1” DagRun ID. DagRun 
> IDs that are “None” will lead to an integrity error at the database level. 
> “-1” DagRun ID TaskInstances are treated as they are now. However Airflow 
> itself will not create such TaskInstances anymore so they come from outside. 
> In the context of the DAGS being the description of “coherence of work” I 
> have a lot of difficulty understanding having work available that is not 
> coherent ;-).
> 
> 
> Bolke
> 
> 
> 

Reply via email to