Hi David!

Thank you for clarifying, I think I understand your concern now. We
currently also work around this by making sure a dag is turned off
when we deploy a new version. We also make sure our jobs are
idempotent and retry-enabled in the case when we forget to turn off
the job, so the issue hasn't caused us too much headache.

I do agree that it would be nice for Airflow to have the option to
guarantee a single version of dag per dag run. I see two approaches:

(1) If a dag is updated, the current dagrun fails and/or retries.
(2) If a dag is updated, the current dagrun continues but uses version
before the update.

(1) requires some mechanism to compare dag generations. One option is
to hash the dagfile and storing that value to the dagrun table, and
compare against it each time a task is running. And in the case if the
hash value is different, update the hash value, then fail/retry the
dag. I think this is a fairly safe approach.

(2) is trickier. A dag only has a property "fileloc" which tracks the
location of the dag file, but the actual content of the dag file is
never versioned. When a task instance starts running, it dynamically
re-processes the dag file specified by the fileloc, generate all the
task objects from the dag file, and fetch the task object by task_id
in order to execute it. So in order to guarantee each dagrun to run a
specific version, previous versions must be maintained on disk somehow
(maintaining this information in memory is difficult, since if the
scheduler/worker shuts down, that information is lost). This makes it
a pretty big change, and I haven't thought much on how to implement
it.

I'm personally leaning towards (1) for sake of simplicity. Note that
some users may not want dag to fail/retry even when dag is updated, so
this should be an optional feature, not required.

My scheduler-foo isn't that great, so curious what others have to say
about this.

On Fri, Feb 23, 2018 at 3:12 PM, David Capwell <dcapw...@gmail.com> wrote:
> Thanks for the reply Joy, let me walk you though things as they are today
>
> 1) we don't stop airflow or disable DAGs while deploying updates to logic,
> this is done live once its released
> 2) the python script in the DAG folder doesn't actually have DAGs in it but
> is a shim layer to allow us to deploy in a atomic way for a single host
>   2.1) this script reads a file on local disk (less than disk page size) to
> find latest git commit deployed
>   2.2) re-does the airflow DAG load process but pointing to the git commit
> path
>
> Example directory structure
>
> /airflow/dags/shim.py
> /airflow/real_dags/
>                             /latest # pointer to latest commit
>                             /[git commit]/
>
> This is how we make sure deploys are consistent within a single task.
>
>
> Now, lets assume we have a fully atomic commit process and are able to
> upgrade DAGs at the exact same moment.
>
> At time T0 the scheduler knows of DAG V1 and schedules two tasks, Task1,
> and Task2
> At time T1 Task1 is picked up by Worker1, so starts executing the task (V1
> logic)
> At time T2 deploy commit happens, current DAG version: V2
> At time T3, Task2 is picked up by Worker2, so starts executing the task (V2
> logic)
>
> In many cases this isn't really a problem (tuning config change to hadoop
> job), but as we have more people using Airflow this is causing a lot of
> time spent debugging why production acted differently than expected (the
> problem was already fixed... why is it still here?).  We also see that some
> tasks expect a given behavior from other tasks, and since they live in the
> same git repo they can modify both tasks at the same time if a breaking
> change is needed, but when this rolls out to prod there isn't a way to do
> this other than turn off the DAG, and login to all hosts to verify fully
> deployed.
>
> We would like to remove this confusion and make generations/versions (same
> thing really) exposed to users and make sure for a single dag_run only one
> version is used.
>
> I hope this is more clear.
>
> On Fri, Feb 23, 2018 at 1:37 PM, Joy Gao <j...@wepay.com> wrote:
>
>> Hi David,
>>
>> Do you mind providing a concrete example of the scenario in which
>> scheduler/workers see different states (I'm not 100% sure if I understood
>> the issue at hand).
>>
>> And by same dag generation, are you referring to the dag version? (DAG
>> version is currently not supported at all, but I can see it being a
>> building block for future use cases).
>>
>> Joy
>>
>> On Fri, Feb 23, 2018 at 1:00 PM, David Capwell <dcapw...@gmail.com> wrote:
>>
>> > My current thinking is to add a field to the dag table that is optional
>> and
>> > provided by the dag. We currently intercept the load path do could use
>> this
>> > field to make sure we load the same generation.  My concern here is the
>> > interaction with the scheduler, not as familiar with that logic to
>> predict
>> > corner cases were this would fail.
>> >
>> > Any other recommendations for how this could be done?
>> >
>> > On Mon, Feb 19, 2018, 10:33 PM David Capwell <dcapw...@gmail.com> wrote:
>> >
>> > > We have been using airflow for logic that delegates to other systems so
>> > > inject a task all tasks depends to make sure all resources used are the
>> > > same for all tasks in the dag. This works well for tasks that delegates
>> > to
>> > > external systems but people are starting to need to run logic in
>> airflow
>> > > and the fact that scheduler and all workers can see different states is
>> > > causing issues
>> > >
>> > > We can make sure that all the code is deployed in a consistent way but
>> > > need help from the scheduler to tell the workers the current generation
>> > for
>> > > a DAG.
>> > >
>> > > My question is, what would be the best way to modify airflow to allow
>> > DAGs
>> > > to define a generation value that the scheduler could send to workers?
>> > >
>> > > Thanks
>> > >
>> >
>>

Reply via email to