> Op 2 mei 2016, om 13:48 heeft Jeremiah Lowin <[email protected]> het volgende > geschreven: > > Well done Bolke and thanks for all the work on this! Looking forward to > kicking the tires later today. > > After last week's conversations I understand why users would want a run_id > but I'm not totally sold that (dag_id, execution_date) is an insufficient > primary key for a DagRun. My reasoning is this: if (dag_id, execution_date) > is a bad primary key, then there must be times we violate it. But under > what circumstances would we ever want two database entries for the state of > the same DAG on the same execution_date? It seems to me that if we update > an already-known DagRun state -- for example, by backfilling over a > scheduled run or running backfill twice -- then we are *updating* the > existing DagRun rather than *creating* a new DagRun, and so the primary key > uniqueness is maintained. >
For lineage purposes I am currently suggesting maintaining the DagRuns in the db, i.e. the backfill will set the state of the other DagRun to “overridden” and thus a new DagRun will exist with the same dag_id + execution_date. This would allow for versioning and understand what versions ran in the past. However if there is a better way to solve this I am open to it. If we remove that need it probably would also remove the need for having a dag_run_id with the task instances. Although I have a “gut” feeling that that might not be entirely true. > Bolke proposed having a larger (realtime) conversation around this and I > think it's a great idea. Perhaps we could do that this week? As you all > know I'm a big proponent of refactoring this whole area and on the whole > I'm hugely supportive of this work -- I just have this one outstanding > question as outlined above. > > > On Sun, May 1, 2016 at 4:15 PM Bolke de Bruin <[email protected]> wrote: > >> Done! >> >> >> https://cwiki.apache.org/confluence/display/AIRFLOW/Improving+the+scheduler+by+making+dag+runs+more+coherent >> >> Bolke >> >>> Op 1 mei 2016, om 21:52 heeft Siddharth Anand >>> <[email protected]> >> het volgende geschreven: >>> >>> Bolke, >>> Thanks for taking the lead on this. Do you mind creating a confluence >> wiki page with this proposal, so that we can link email, jiras, and the >> signal to it. We will need a roadmap linked to our main site as well. >>> >>> Sent from Sid's iPhone >>> >>>> On May 1, 2016, at 12:45 PM, Bolke de Bruin <[email protected]> wrote: >>>> >>>> Hi, >>>> >>>> A lot of discussion has been around scheduler issues. Some of the >> symptoms are requiring the start_date to align with the (cron)interval, not >> being able to update a DAG with a new interval, backfills interfering with >> normal dag runs, unable to version dags. Work arounds also have been >> provided eg., ignore_first_depends_on_past. Many issues on github have been >> created with questions around why does the scheduler do this and not this. >> We can hardly say it is working intuitively at the moment. >>>> >>>> The following is a proposal for an update to the scheduler while >> remaining 99% backwards compatible. Code (PR-1431) is fully working and all >> tests pass on travis. Now I am looking for real world testers, feedback - >> Paul I understood you are also working on something on the scheduler? - >> etc. :-). >>>> >>>> SUMMARY >>>> >>>> Model changes >>>> - TaskInstances now always have an associated DagRun >>>> - TaskInstances can still be run without a real DagRun but they need >> the special “-1” DagRun id. >>>> - Backfills also create DagRuns >>>> >>>> Scheduler >>>> - Scheduler is aware of backfills >>>> - Scheduler will fast forward beyond backfill if needed while >> maintaining the correct interval >>>> - Scheduler will adjust start date for interval (no more aligning >> needed) >>>> >>>> Backfills >>>> - Backfills can update the past, but will maintain lineage >>>> - Backfills insert themselves in between scheduled dag runs if needed, >> making depend_on_past work with arbitrary inserted backfills >>>> >>>> Where might there be issues: >>>> - If you are creating TaskInstances inside DAGs yourself or in >> Operators you might need to update those to take into account a dag_run_id. >> For Operators have a look at the PythonOperator how to update it. >>>> - If you are issuing “airflow run xxx” *yourself*, thus outside >> airflow: for now Airflow will set your dag_run_id to -1, but is this really >> what you want? Please let me know the use cases and discuss. >>>> - For depends_on_past I check on the DagRun level, it might be required >> to move this to the TaskInstance level. Not a big change and we can even >> support both! >>>> >>>> BACKGROUND >>>> If you consider DAGs a description of coherence work and Tasks (as >> defined in Airflow) a description of work, then DagRuns are the >> instantiation of DAGS in a point of time. TaskInstances are then the >> instantiations of Tasks in a point of time. To maintain the coherence >> between TaskInstances a DagRun needs to be aware of the TaskInstances it >> has. So far so good. Airflow does this quite well and as long as you don’t >> try to do something fancy like updating you interval with depend_on_past >> your tasks will run happily. Now enter backfills. Backfills allow you to >> either create or alter history. It does this by arbitrarily inserting tasks >> into the time line and completely disregarding the scheduler. Also the >> scheduler does not know about backfills leading to other issues. >>>> >>>> If we would maintain a time line of DagRuns this would solve the above >> mentioned issues, improve lineage and pave the way for DAG versioning. It >> would also simplify the code in the future by moving a lot of logic to >> DagRun. While one might argue that currently “dag_id + execution_date” >> already does this for both DagRun and TaskInstances this foregoes the issue >> that backfills create in the scheduler, in lineage and in versioning in >> addition this would not allow you to solve the moving interval easily. With >> backfills you would be able to run an updated dag that changes the past, >> but how to answer what version of the DAG you ran when? Depending on past >> in a backfill with a new task is also quite hard. >>>> >>>> IMPLEMENTATION >>>> DagRuns now maintain a “previous” property. Previous points to the >> previous dag run by id, if previous is None it is considered the first >> DagRun. The scheduler will set the previous property if it detects a >> previous run. A previous run can be a scheduled run or a backfill run. If >> will adhere to the schedule_interval, but it will fast forward beyond the >> latest execution date of either the last scheduled run or last backfill run >> whatever comes later. So at the moment the scheduler will not fill in the >> blanks for you if there is a gap between the scheduled run and a backfill >> run of more than one interval. >>>> >>>> Backfill will create a DagRun and insert it into the timeline, ie. it >> will update the previous property of a scheduled DagRun if it is set in the >> future seen from the Backfill DagRun. If it encounters a DagRun at the same >> execution date for the same dag_id it will re-own the tasksinstances and >> set the state of the other DagRun to “overridden”, essentially orphaning >> the other DagRun but keeping its record around for auditing purposes. >>>> >>>> TaskInstances now maintain a reference to a DagRun, which cannot be >> None. If it is created with None it will be set to the special “-1” DagRun >> ID. DagRun IDs that are “None” will lead to an integrity error at the >> database level. “-1” DagRun ID TaskInstances are treated as they are now. >> However Airflow itself will not create such TaskInstances anymore so they >> come from outside. In the context of the DAGS being the description of >> “coherence of work” I have a lot of difficulty understanding having work >> available that is not coherent ;-). >>>> >>>> >>>> Bolke >>>> >>>> >>>> >>> >> >>
