All,

I have created AIRFLOW-20 to continue the discussion on the PR/Code. 

https://issues.apache.org/jira/browse/AIRFLOW-20

Bolke

> Op 28 apr. 2016, om 20:08 heeft Bolke de Bruin <[email protected]> het 
> volgende geschreven:
> 
> Ok PR-1431 now passes all tests for mysql. Postgresql/sqlite have some 
> alembic upgrade issues, that I don’t know yet how to solve. Basically, due to 
> the fact that a unique constraint is added and dropped in the same 
> transaction it does not work. So any ideas are welcome.
> 
> I have to revert my comment earlier on the deadlock issues not there anymore. 
> I got confused as a depend_on_past block is reported as a deadlock by the 
> backfill. I did update the tests to make sure there is a past to depend on 
> (and thus fail if required).
> 
> On the first issue you mention, Jeremiah, a new state for DagRuns 
> WAITING_FOR_PREVIOUS would be the way forward. This would allow you to walk 
> the timeline. With the PR detecting this kind “deadlock” is already very 
> simple.  For backfills, as a backfill job is currently immediate, the failure 
> should also be immediate I think: not deadlocked but reported as cannot 
> schedule as previous run not satisfied, which is easy now and we can even 
> report which root-run has not finished yet with WAITING_FOR_PREVIOUS.
> 
> On the second issue, with the PR this is no longer an issue. If the backfill 
> run is the first one it will know so, and just run. Hence this parameter 
> superfluous in my opinion. I also consider the parameter a workaround 
> currently.
> 
> Bolke
> 
>> Op 28 apr. 2016, om 14:11 heeft Jeremiah Lowin <[email protected]> het 
>> volgende geschreven:
>> 
>> The depends on past deadlock is the worst kind of deadlock (to catch in
>> code). Basically, if you're a DAG and you have "active" (runnable) tasks
>> but none of them have dependencies met, that's a deadlock. But if your
>> tasks depend_on_past we can't draw that conclusion since even though the
>> dependencies are unmet NOW they might be met in the future. However this
>> will still be an issue even with your change Bolke because let's say you
>> have 10 DagRuns active -- 9 of them will report that they can't start, but
>> we don't want to fail them because they are just waiting for past runs. So
>> I think the deadlock unit tests just need to change from looking at the
>> next run to looking 2+ runs out.
>> 
>> Ignore_first_depends_on_past is to allow arbitrary backfills of
>> depends_on_past tasks. In other words, if you want to backfill from A to B,
>> we want to make sure you can run the first one. It used to be that when you
>> ran a backfill, it overrode the start date of the task to trick the task in
>> to running on the first back filled date. Unfortunately that change didn't
>> survive into nested sub processes so it was a fragile hack and
>> ignore_first_depends_on_past replaced it. It's probably not ideal but it's
>> more robust.
>> On Thu, Apr 28, 2016 at 3:53 AM Bolke de Bruin <[email protected]> wrote:
>> 
>>> I just realized that an extra “benefit” (depending how you look at it) of
>>> this implementation is that many of the deadlock issues are gone.
>>> 
>>> * If there is no previous run, it will satisfy depend_on_past so the
>>> test_backfill_depends_on_past that checks for a deadlock (instead of maybe
>>> really checking for the dependency) will fail as there is no deadlock
>>> anymore.
>>> * test_cli_backfill_depends_on_past and test_dagrun_deadlock also fail as
>>> there is no deadlock anymore
>>> 
>>> Why does this not happen?
>>> 
>>> If a run detects it is the first run, it will start. This removes the
>>> requirement for ignore_first_depends_on_past (was this present for a real
>>> use case or a workaround for the scheduler?).
>>> 
>>> - Bolke
>>> 
>>>> Op 28 apr. 2016, om 09:08 heeft Bolke de Bruin <[email protected]> het
>>> volgende geschreven:
>>>> 
>>>> No I don’t think that covers it (although I get slightly confused by
>>> your comments "depends_on_past_schedule
>>>> “ and “depends_on_past_run”, but it seems too complex from the user’s
>>> perspective at first glance). The DagRun.previous handling is used to
>>> connect previous related DagRuns in order to make depends_on_past work with
>>> a moving interval or an automatically aligned start_date.
>>>> 
>>>> Imagine the following
>>>> 
>>>> Use Case 1
>>>> Dag1 has an assigned start_date "2016-04-24 00:00:00" with a cron
>>> schedule of “10 1 * * *”, ie run every day at 1.10am. Depends_on_past is
>>> true.
>>>> 
>>>> The current scheduler kicks off a run “2016-04-24 00:00:00” and then
>>> stops as the previous interval of “2016-04-24 01:10:00” is “2016-04-23
>>> 01:10:00”. This cannot be found as it has not taken place and thus the
>>> whole thing grinds to a halt and the TaskInstances refuse to run.
>>>> 
>>>> What the user expects here is that the first run is “2016-04-24
>>> 01:10:00”, ie start_date + interval, unless the start_date is on the
>>> interval, ie. start_date is first interval. This is what I address by
>>> start_date normalization in the PR. However, the second issue then kicks in
>>> as the “previous” run can still not be found.
>>>> 
>>>> Use Case 2
>>>> Dag2 has an assigned start_date "2016-04-24 01:10:00"  with a cron
>>> schedule of “10 1 * * *”. Depends_on_past is true.
>>>> 
>>>> The scheduler happily goes through a few runs, but then the dag is
>>> updated and the schedule adjusted. Because the previous interval cannot be
>>> found by the TaskInstance (emphasis), tasks get stuck again requiring an
>>> manual override.
>>>> 
>>>> What the user expects here is that the scheduler is smart enough to
>>> figure out that we are still running the same dag and that it needs to look
>>> up the previous run for that dag and make sure dependencies are met with
>>> that previous dagrun in mind.
>>>> 
>>>> I don’t think those two use cases are edge cases, considering the amount
>>> of questions we get on these subjects.
>>>> 
>>>> To resolve the remaining issues (aside from start_date normalization) I
>>> first made DagRun aware of its predecessors. Then I strengthened the
>>> relationship between TaskInstances and DagRuns slightly, by optionally
>>> including a dagrun_id in the TaskInstance. Now a TaskInstance can lookup
>>> its predecessors in the previous DagRun and know for sure that it is either
>>> the first run or it has a predecessor somewhere in time instead of guessing.
>>>> 
>>>> What I am unsure of is what to consider is the unit of work: 1) is a
>>> TaskInstance dependent on its past predecessor ignoring the outcome of the
>>> DagRun? (previous TaskInstance can be successful, but previous DagRun as a
>>> whole can fail) or 2) is it dependent on the outcome of the DagRun? 3) Can
>>> it be both? In case of 1 and 3 my logic needs to be updated slightly, but
>>> that should not be too much of a big deal. However I have difficulty
>>> imagining why you want to do that.
>>>> 
>>>> - Bolke
>>>> 
>>>> 
>>>>> Op 27 apr. 2016, om 18:17 heeft Maxime Beauchemin <
>>> [email protected]> het volgende geschreven:
>>>>> 
>>>>> Added a few comments on the PR:
>>>>> 
>>>>> ------------------------------------------
>>>>> 
>>>>> So if I understand right the previous DagRun.id handling is to account
>>> for
>>>>> DAGs that have a mixed workload of scheduled and triggered DagRuns
>>> right?
>>>>> 
>>>>> Do we have real world use cases for those? I always though a DAG would
>>>>> either be scheduled, or externally triggered, not a mix of both, but we
>>>>> never made that a real restriction, so I'm sure some people mix both.
>>>>> 
>>>>> If that is the case that we want to support both, the notion of
>>>>> depends_on_past become mushy, where maybe people may expect or desire
>>>>> either a depends_on_past_schedule or depends_on_past_run.
>>>>> 
>>>>> Since it's pretty edge-case (combo of both mixed scheduled AND
>>>>> depend_on_past=True) and unclear what the desired behavior might be, I'd
>>>>> vote to do something simple in terms of logic and document it clearly.
>>> It
>>>>> could be something like "if the DAG is scheduled, then we use the latest
>>>>> DagRun that is >= previous schedule, if not scheduled then we take the
>>>>> previous DagRun, whatever it is at that time.", this would not require
>>>>> handling the previous_dag_run_id
>>>>> 
>>>>> On Wed, Apr 27, 2016 at 6:02 AM, Bolke de Bruin <[email protected]>
>>> wrote:
>>>>> 
>>>>>> I have also given it some thought and some coding. I hope we agree that
>>>>>> from a user perspective
>>>>>> there currently is an issue. The need to align the start_date with the
>>>>>> interval is counter intuitive
>>>>>> and leads to a lot of questions and issue creation, although it is in
>>> the
>>>>>> documentation. If we are
>>>>>> able to fix this with none or little consequences for current setups
>>> that
>>>>>> should be preferred, I think.
>>>>>> The dependency explainer is really great work, but it doesn’t address
>>> the
>>>>>> core issue.
>>>>>> 
>>>>>> I do recognize the issue mentioned by Max and I agree with Jeremiah
>>> that
>>>>>> it is a separate issue,
>>>>>> however if we are to tackle the above it needs to be done in one go.
>>>>>> 
>>>>>> If you consider a DAG a description of cohesion between work items (in
>>> OOP
>>>>>> java terms
>>>>>> a class), then a DagRun is the instantiation of a DAG in time (in OOP
>>> java
>>>>>> terms an instance).
>>>>>> Tasks are then the description of a work item and a TaskInstance the
>>>>>> instantiation of the Task in time.
>>>>>> 
>>>>>> In my opinion issues pop up due to the current paradigm of considering
>>> the
>>>>>> TaskInstance
>>>>>> the smallest unit of work and asking it to maintain its own state in
>>>>>> relation to other TaskInstances
>>>>>> in a DagRun and in a previous DagRun of which it has no (real)
>>> perception.
>>>>>> Tasks are instantiated
>>>>>> by a cartesian product with the dates of DagRun instead of the DagRuns
>>>>>> itself.
>>>>>> 
>>>>>> The very loose coupling between DagRuns and TaskInstances can be
>>> improved
>>>>>> while maintaining
>>>>>> flexibility to run tasks without a DagRun. This would help with a
>>> couple
>>>>>> of things:
>>>>>> 
>>>>>> 1. start_date can be used as a ‘execution_date’ or a point in time
>>> when to
>>>>>> start looking
>>>>>> 2. a new interval for a dag will maintain depends_on_past
>>>>>> 3. paused dags do not give trouble
>>>>>> 4. tasks will be executed in order
>>>>>> 5. the ignore_first_depend_on_past could be removed as a task will now
>>>>>> know if it is really the first
>>>>>> 
>>>>>> In PR-1431 a lot of this work has been done by:
>>>>>> 
>>>>>> 1. Adding a “previous” field to a DagRun allowing it to connect to its
>>>>>> predecessor
>>>>>> 2. Adding a dag_run_id to TaskInstances so a TaskInstance knows about
>>> the
>>>>>> DagRun if needed
>>>>>> 3. Using start_date + interval as the first run date unless start_date
>>> is
>>>>>> on the interval then start_date is the first run date
>>>>>> 
>>>>>> The downside of the PR would be that TaskInstances without a DagRun
>>> will
>>>>>> not honor
>>>>>> depend_on_past, so the backfill would need to be adjusted to create a
>>>>>> DagRun
>>>>>> and properly connect it to a previous DagRun if that exists. Not too
>>> much
>>>>>> of a big
>>>>>> deal, I think - will work on that a bit as well in the PR.
>>>>>> 
>>>>>> So this should give at least an idea where things could head to. Any
>>>>>> thoughts?
>>>>>> 
>>>>>> Cheers
>>>>>> Bolke
>>>>>> 
>>>>>>> Op 26 apr. 2016, om 18:42 heeft Jeremiah Lowin <[email protected]>
>>> het
>>>>>> volgende geschreven:
>>>>>>> 
>>>>>>> On further thought, I understand the issue you're describing where
>>> this
>>>>>>> could lead to out-of-order runs. In fact, Bolke alerted me to the
>>>>>>> possibility earlier but I didn't make the connection! That feels like
>>> a
>>>>>>> separate issue -- to guarantee that tasks are executed in order (and
>>> more
>>>>>>> importantly that their database entries are created). I think the
>>>>>>> depends_on_past issue is related but separate -- though clearly needs
>>> to
>>>>>> be
>>>>>>> fleshed out for all cases :)
>>>>>>> 
>>>>>>> On Tue, Apr 26, 2016 at 12:28 PM Jeremiah Lowin <[email protected]>
>>>>>> wrote:
>>>>>>> 
>>>>>>>> I'm afraid I disagree -- though we may be talking about two different
>>>>>>>> issues. This issue deals specifically with how to identify the
>>> "past" TI
>>>>>>>> when evaluating "depends_on_past", and shouldn't be impacted by
>>> shifting
>>>>>>>> start_date, transparently or not.
>>>>>>>> 
>>>>>>>> Here are three valid examples of depends_on_past DAGs that would
>>> fail to
>>>>>>>> run with the current setup:
>>>>>>>> 
>>>>>>>> 1. A DAG with no schedule that is only run manually or via ad-hoc
>>>>>>>> backfill. Without a schedule_interval, depends_on_past will always
>>> fail
>>>>>>>> (since it looks back one schedule_interval).
>>>>>>>> 
>>>>>>>> 2. A DAG with a schedule, but that is sometimes run off-schedule.
>>> Let's
>>>>>>>> say a scheduled run succeeds and then an off-schedule run fails. When
>>>>>> the
>>>>>>>> next scheduled run starts, it shouldn't proceed because the most
>>> recent
>>>>>>>> task failed -- but it will look back one schedule_interval, jumping
>>> OVER
>>>>>>>> the most recent run, and decide it's ok to proceed.
>>>>>>>> 
>>>>>>>> 3. A DAG with a schedule that is paused for a while. This DAG could
>>> be
>>>>>>>> running perfectly fine, but if it is paused for a while and then
>>>>>> resumed,
>>>>>>>> its depends_on_past tasks will look back one schedule_interval and
>>> see
>>>>>>>> nothing, and therefore refuse to run.
>>>>>>>> 
>>>>>>>> So my proposal is simply that the depends_on_past logic looks back at
>>>>>> the
>>>>>>>> most recent task as opposed to rigidly assuming there is a task one
>>>>>>>> schedule_interval ago. For a regularly scheduled DAG, this will
>>> result
>>>>>> in
>>>>>>>> absolutely no behavior change. However it will robustly support a
>>> much
>>>>>>>> wider variety of cases like the ones I listed above.
>>>>>>>> 
>>>>>>>> J
>>>>>>>> 
>>>>>>>> 
>>>>>>>> On Tue, Apr 26, 2016 at 11:08 AM Maxime Beauchemin <
>>>>>>>> [email protected]> wrote:
>>>>>>>> 
>>>>>>>>>>>> "The clear fix seems to be to have depends_on_past check the
>>> last TI
>>>>>>>>> that
>>>>>>>>> ran, regardless of whether it ran `schedule_interval` ago. That's in
>>>>>> line
>>>>>>>>> with the intent of the flag. I will submit a fix."
>>>>>>>>> 
>>>>>>>>> I don't think so. This would lead to skipping runs, which
>>>>>> depends_on_past
>>>>>>>>> is used as a guarantee to run every TI, sequentially.
>>>>>>>>> 
>>>>>>>>> Absolute scheduling (cron expressions) is much better than relative
>>>>>>>>> scheduling (origin + interval). Though it's easy to make relative
>>>>>>>>> scheduling behave in an absolute way. You just have to use a rounded
>>>>>>>>> start_date to your schedule_interval, and not move things around.
>>>>>> Dynamic
>>>>>>>>> start_dates have always been a problem and should probably not be
>>>>>>>>> supported, though there's no way for us to tell.
>>>>>>>>> 
>>>>>>>>> Changing the schedule interval or the "origin time" is a bit tricky
>>> and
>>>>>>>>> should be documented.
>>>>>>>>> 
>>>>>>>>> If people use depend_on_past=True and change the origin or the
>>>>>> interval,
>>>>>>>>> they basically redefine what "past" actually means and will require
>>> to
>>>>>>>>> "mark success" or defining a new "start_date" as a way to say
>>> "please
>>>>>>>>> disregard depend_on_past for this date"
>>>>>>>>> 
>>>>>>>>> For those who haven't fully digested "What's the deal with
>>>>>> start_dates",
>>>>>>>>> please take the time to read it:
>>>>>>>>> http://pythonhosted.org/airflow/faq.html
>>>>>>>>> 
>>>>>>>>> Also see this part of the docs:
>>>>>>>>> 
>>>>>>>>> ​
>>>>>>>>> 
>>>>>>>>> Max
>>>>>>>>> 
>>>>>>>>> On Mon, Apr 25, 2016 at 1:14 PM, Jeremiah Lowin <[email protected]>
>>>>>> wrote:
>>>>>>>>> 
>>>>>>>>>> Bolke, Sid and I had a brief conversation to discuss some of the
>>>>>>>>>> implications of https://github.com/airbnb/airflow/issues/1427
>>>>>>>>>> 
>>>>>>>>>> There are two large points that need to be addressed:
>>>>>>>>>> 
>>>>>>>>>> 1. this particular issue arises because of an alignment issue
>>> between
>>>>>>>>>> start_date and schedule_interval. This can only happen with
>>> cron-based
>>>>>>>>>> schedule_intervals that describe absolute points in time (like
>>> “1am”)
>>>>>> as
>>>>>>>>>> opposed to time deltas (like “every hour”). Ironically, I once
>>>>>> reported
>>>>>>>>>> this same issue myself (#959). In the past (and in the docs) we
>>> have
>>>>>>>>>> simply
>>>>>>>>>> said that users must make sure the two params agree. We discussed
>>> the
>>>>>>>>>> possibility of a DAG validation method to raise an error if the
>>>>>>>>>> start_date
>>>>>>>>>> and schedule_interval don’t align, but Bolke made the point (and I
>>>>>>>>>> agreed)
>>>>>>>>>> that in these cases, start_date is sort of like telling the
>>> scheduler
>>>>>> to
>>>>>>>>>> “start paying attention” as opposed to “this is my first execution
>>>>>> date”.
>>>>>>>>>> In #1427, the scheduler was being asked to start paying attention
>>> on
>>>>>>>>>> 4/24/16 00:00:00 but not to do anything until 4/24/16 01:10:00.
>>>>>> However,
>>>>>>>>>> it
>>>>>>>>>> was scheduling a first run at midnight and a second run at 1:10.
>>>>>>>>>> 
>>>>>>>>>> Regardless of whether we choose to validate/warn/error, Bolke is
>>>>>> going to
>>>>>>>>>> change the scheduling logic so that the cron-based interval takes
>>>>>>>>>> precedence over a start date. Specifically, the first date on or
>>> after
>>>>>>>>>> the
>>>>>>>>>> start_date that complies with the schedule_interval becomes the
>>> first
>>>>>>>>>> execution date.
>>>>>>>>>> 
>>>>>>>>>> 2. Issue #1 led to a second issue: depends_on_past checks for a
>>>>>>>>>> successful
>>>>>>>>>> TI at `execution_date - schedule_interval`. This is fragile, since
>>> it
>>>>>> is
>>>>>>>>>> very possible for the previous TI to have run at any time in the
>>> past,
>>>>>>>>>> not
>>>>>>>>>> just one schedule_interval ago. This can happen easily with ad-hoc
>>> DAG
>>>>>>>>>> runs, and also if a DAG was paused for a while. Less commonly, it
>>>>>> happens
>>>>>>>>>> with the situation described in point #1, where the first scheduled
>>>>>> run
>>>>>>>>>> is
>>>>>>>>>> off-schedule (the midnight run followed by the daily 1:10am runs).
>>>>>>>>>> 
>>>>>>>>>> The clear fix seems to be to have depends_on_past check the last TI
>>>>>> that
>>>>>>>>>> ran, regardless of whether it ran `schedule_interval` ago. That's
>>> in
>>>>>> line
>>>>>>>>>> with the intent of the flag. I will submit a fix.
>>>>>>>>>> 
>>>>>>>>>> -J
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>> 
>>>>>> 
>>>> 
>>> 
>>> 
> 

Reply via email to