Thanks for all the details! With a pluggable fetcher we would be able to
add our own logic for how to fetch so sounds like a good place to start for
something like this!

On Wed, Feb 28, 2018, 4:39 PM Joy Gao <[email protected]> wrote:

> +1 on DagFetcher abstraction, very airflow-esque :)
>
> On Wed, Feb 28, 2018 at 11:25 AM, Maxime Beauchemin
> <[email protected]> wrote:
> > Addressing a few of your questions / concerns:
> >
> > * The scheduler uses a multiprocess queue to queue up tasks, each
> > subprocess is in charge of a single DAG "scheduler cycle" which triggers
> > what it can for active DagRuns. Currently it fills the DagBag from the
> > local file system, looking for a specific module where the master process
> > last saw that DAG. Fetching the DAG is a metadata operation, DAG
> artifacts
> > shouldn't be too large, we can assume that it takes seconds at most to
> > fetch a DAG, which is ok. We generally assume that the scheduler should
> > fully cycle every minute or so. Version-aware DagFetcher could also
> > implement some sort of caching if that was a concern (shouldn't be
> though).
> > * For consistency within the whole DagRun, the scheduler absolutely has
> to
> > read the right version. If tasks got removed they would never get
> scheduled
> > and consistency cannot be achieved.
> > * TaskInstances get created the first time they are identified as
> runnable
> > by the scheduler and are born with a queued status I believe (from
> memory,
> > haven't read the latest code to confirm). The worker double checks and
> sets
> > it as running as part of a database transaction to avoid double-firing.
> >
> > Max
> >
> > On Wed, Feb 28, 2018 at 7:29 AM, Chris Palmer <[email protected]>
> wrote:
> >
> >> I'll preface this with the fact that I'm relatively new to Airflow, and
> >> haven't played around with a lot of the internals.
> >>
> >> I find the idea of a DagFetcher interesting but would we worry about
> >> slowing down the scheduler significantly? If the scheduler is having to
> >> "fetch" multiple different DAG versions, be it git refs or artifacts
> from
> >> Artifactory, we are talking about adding significant time to each
> scheduler
> >> run. Also how would the scheduler know which DAGs to fetch from where if
> >> there aren't local files on disk listing those DAGs? Maybe I'm missing
> >> something in the implementation.
> >>
> >> It seems to me that the fetching of the different versions should be
> >> delegated to the Task (or TaskInstance) itself. That ensures we only
> spend
> >> the time to "fetch" the version that is needed when it is needed. One
> down
> >> side might be that each TaskInstance running for the same version of the
> >> DAG might end up doing the "fetch" independently (duplicating that
> work).
> >>
> >> I think this could be done by adding some version attribute to the
> DagRun
> >> that gets set at creation, and have the scheduler pass that version to
> the
> >> TaskInstances when they are created. You could even extend this so that
> you
> >> could have an arbitrary set of "executor_parameters" that get set on a
> >> DagRun and are passed to TaskInstances. Then the specific Executor class
> >> that is running that TaskInstance could handle the
> "executor_parameters" as
> >> it sees fit.
> >>
> >> One thing I'm not clear on is how and when TaskInstances are created.
> When
> >> the scheduler first sees a specific DagRun do all the TaskInstances get
> >> created immediately, but only some of them get queued? Or does the
> >> scheduler only create those TaskInstances which can be queued right now?
> >>
> >> In particular if a DagRun gets created and while it is running the DAG
> is
> >> updated and a new Task is added, will the scheduler pick up that new
> Task
> >> for the running DagRun? If the answer is yes, then my suggestion above
> >> would run the risk of scheduling a Task for a DAG version where that
> Task
> >> didn't exist. I'm sure you could handle that somewhat gracefully but
> it's a
> >> bit ugly.
> >>
> >> Chris
> >>
> >> On Wed, Feb 28, 2018 at 2:05 AM, Maxime Beauchemin <
> >> [email protected]> wrote:
> >>
> >> > At a higher level I want to say a few things about the idea of
> enforcing
> >> > version consistency within a DagRun.
> >> >
> >> > One thing we've been talking about is the need for a "DagFetcher"
> >> > abstraction, where it's first implementation that would replace and
> mimic
> >> > the current one would be "FileSystemDagFetcher". One specific
> DagFetcher
> >> > implementation may or may not support version semantics, but if it
> does
> >> > should be able to receive a version id and return the proper version
> of
> >> the
> >> > DAG object. For instance that first "FileSystemDagFetcher" would not
> >> > support version semantic, but perhaps a "GitRepoDagFetcher" would, or
> an
> >> > "ArtifactoryDagFetcher", or "TarballInS3DagFetcher" may as well.
> >> >
> >> > Of course that assumes that the scheduler knows and stores the active
> >> > version number when generating a new DagRun, and for that information
> to
> >> be
> >> > leveraged on subsequent scheduler cycles and on workers when task are
> >> > executed.
> >> >
> >> > This could also enable things like "remote" backfills (non local,
> >> > parallelized) of a DAG definition that's on an arbitrary git ref
> >> (assuming
> >> > a "GitRepoDagFetcher").
> >> >
> >> > There are [perhaps] unintuitive implications where clearing a single
> task
> >> > would then re-run the old DAG definition on that task (since the
> version
> >> > was stamped in the DagRun and hasn't changed), but
> deleting/recreating a
> >> > DagRun would run the latest version (or any other version that may be
> >> > specified for that matter).
> >> >
> >> > I'm unclear on how much work that represents exactly, but it's
> certainly
> >> > doable and may only require to change part of the DagBag class and a
> few
> >> > other places.
> >> >
> >> > Max
> >> >
> >> > On Tue, Feb 27, 2018 at 6:48 PM, David Capwell <[email protected]>
> >> wrote:
> >> >
> >> > > Thanks for your feedback!
> >> > >
> >> > > Option 1 is a non-starter for us. The reason is we have DAGs that
> take
> >> 9+
> >> > > hours to run.
> >> > >
> >> > > Option 2 is more where my mind was going, but it's rather large.
> How I
> >> > see
> >> > > it you need a MVCC DagBag that's aware of multiple versions (what
> >> > provides
> >> > > version?).  Assuming you can track active dag runs pointing to which
> >> > > versions you know how to cleanup (fine with external).  The pro
> here is
> >> > you
> >> > > have snapshot isolation for dag_run, con is more bookkeeping and
> >> require
> >> > > deploy to work with this (last part may be a good thing though).
> >> > >
> >> > > The only other option I can think of is to lock deploy so the system
> >> only
> >> > > picks up new versions when no dag_run holds the lock.  This is
> flawed
> >> for
> >> > > many reasons, but breaks horrible for dag_runs that takes minutes (I
> >> > assume
> >> > > 99% do).
> >> > >
> >> > >
> >> > >
> >> > > On Tue, Feb 27, 2018, 4:50 PM Joy Gao <[email protected]> wrote:
> >> > >
> >> > > > Hi David!
> >> > > >
> >> > > > Thank you for clarifying, I think I understand your concern now.
> We
> >> > > > currently also work around this by making sure a dag is turned off
> >> > > > when we deploy a new version. We also make sure our jobs are
> >> > > > idempotent and retry-enabled in the case when we forget to turn
> off
> >> > > > the job, so the issue hasn't caused us too much headache.
> >> > > >
> >> > > > I do agree that it would be nice for Airflow to have the option to
> >> > > > guarantee a single version of dag per dag run. I see two
> approaches:
> >> > > >
> >> > > > (1) If a dag is updated, the current dagrun fails and/or retries.
> >> > > > (2) If a dag is updated, the current dagrun continues but uses
> >> version
> >> > > > before the update.
> >> > > >
> >> > > > (1) requires some mechanism to compare dag generations. One
> option is
> >> > > > to hash the dagfile and storing that value to the dagrun table,
> and
> >> > > > compare against it each time a task is running. And in the case if
> >> the
> >> > > > hash value is different, update the hash value, then fail/retry
> the
> >> > > > dag. I think this is a fairly safe approach.
> >> > > >
> >> > > > (2) is trickier. A dag only has a property "fileloc" which tracks
> the
> >> > > > location of the dag file, but the actual content of the dag file
> is
> >> > > > never versioned. When a task instance starts running, it
> dynamically
> >> > > > re-processes the dag file specified by the fileloc, generate all
> the
> >> > > > task objects from the dag file, and fetch the task object by
> task_id
> >> > > > in order to execute it. So in order to guarantee each dagrun to
> run a
> >> > > > specific version, previous versions must be maintained on disk
> >> somehow
> >> > > > (maintaining this information in memory is difficult, since if the
> >> > > > scheduler/worker shuts down, that information is lost). This
> makes it
> >> > > > a pretty big change, and I haven't thought much on how to
> implement
> >> > > > it.
> >> > > >
> >> > > > I'm personally leaning towards (1) for sake of simplicity. Note
> that
> >> > > > some users may not want dag to fail/retry even when dag is
> updated,
> >> so
> >> > > > this should be an optional feature, not required.
> >> > > >
> >> > > > My scheduler-foo isn't that great, so curious what others have to
> say
> >> > > > about this.
> >> > > >
> >> > > > On Fri, Feb 23, 2018 at 3:12 PM, David Capwell <
> [email protected]>
> >> > > wrote:
> >> > > > > Thanks for the reply Joy, let me walk you though things as they
> are
> >> > > today
> >> > > > >
> >> > > > > 1) we don't stop airflow or disable DAGs while deploying
> updates to
> >> > > > logic,
> >> > > > > this is done live once its released
> >> > > > > 2) the python script in the DAG folder doesn't actually have
> DAGs
> >> in
> >> > it
> >> > > > but
> >> > > > > is a shim layer to allow us to deploy in a atomic way for a
> single
> >> > host
> >> > > > >   2.1) this script reads a file on local disk (less than disk
> page
> >> > > size)
> >> > > > to
> >> > > > > find latest git commit deployed
> >> > > > >   2.2) re-does the airflow DAG load process but pointing to the
> git
> >> > > > commit
> >> > > > > path
> >> > > > >
> >> > > > > Example directory structure
> >> > > > >
> >> > > > > /airflow/dags/shim.py
> >> > > > > /airflow/real_dags/
> >> > > > >                             /latest # pointer to latest commit
> >> > > > >                             /[git commit]/
> >> > > > >
> >> > > > > This is how we make sure deploys are consistent within a single
> >> task.
> >> > > > >
> >> > > > >
> >> > > > > Now, lets assume we have a fully atomic commit process and are
> able
> >> > to
> >> > > > > upgrade DAGs at the exact same moment.
> >> > > > >
> >> > > > > At time T0 the scheduler knows of DAG V1 and schedules two
> tasks,
> >> > > Task1,
> >> > > > > and Task2
> >> > > > > At time T1 Task1 is picked up by Worker1, so starts executing
> the
> >> > task
> >> > > > (V1
> >> > > > > logic)
> >> > > > > At time T2 deploy commit happens, current DAG version: V2
> >> > > > > At time T3, Task2 is picked up by Worker2, so starts executing
> the
> >> > task
> >> > > > (V2
> >> > > > > logic)
> >> > > > >
> >> > > > > In many cases this isn't really a problem (tuning config change
> to
> >> > > hadoop
> >> > > > > job), but as we have more people using Airflow this is causing a
> >> lot
> >> > of
> >> > > > > time spent debugging why production acted differently than
> expected
> >> > > (the
> >> > > > > problem was already fixed... why is it still here?).  We also
> see
> >> > that
> >> > > > some
> >> > > > > tasks expect a given behavior from other tasks, and since they
> live
> >> > in
> >> > > > the
> >> > > > > same git repo they can modify both tasks at the same time if a
> >> > breaking
> >> > > > > change is needed, but when this rolls out to prod there isn't a
> way
> >> > to
> >> > > do
> >> > > > > this other than turn off the DAG, and login to all hosts to
> verify
> >> > > fully
> >> > > > > deployed.
> >> > > > >
> >> > > > > We would like to remove this confusion and make
> >> generations/versions
> >> > > > (same
> >> > > > > thing really) exposed to users and make sure for a single
> dag_run
> >> > only
> >> > > > one
> >> > > > > version is used.
> >> > > > >
> >> > > > > I hope this is more clear.
> >> > > > >
> >> > > > > On Fri, Feb 23, 2018 at 1:37 PM, Joy Gao <[email protected]>
> wrote:
> >> > > > >
> >> > > > >> Hi David,
> >> > > > >>
> >> > > > >> Do you mind providing a concrete example of the scenario in
> which
> >> > > > >> scheduler/workers see different states (I'm not 100% sure if I
> >> > > > understood
> >> > > > >> the issue at hand).
> >> > > > >>
> >> > > > >> And by same dag generation, are you referring to the dag
> version?
> >> > (DAG
> >> > > > >> version is currently not supported at all, but I can see it
> being
> >> a
> >> > > > >> building block for future use cases).
> >> > > > >>
> >> > > > >> Joy
> >> > > > >>
> >> > > > >> On Fri, Feb 23, 2018 at 1:00 PM, David Capwell <
> >> [email protected]>
> >> > > > wrote:
> >> > > > >>
> >> > > > >> > My current thinking is to add a field to the dag table that
> is
> >> > > > optional
> >> > > > >> and
> >> > > > >> > provided by the dag. We currently intercept the load path do
> >> could
> >> > > use
> >> > > > >> this
> >> > > > >> > field to make sure we load the same generation.  My concern
> here
> >> > is
> >> > > > the
> >> > > > >> > interaction with the scheduler, not as familiar with that
> logic
> >> to
> >> > > > >> predict
> >> > > > >> > corner cases were this would fail.
> >> > > > >> >
> >> > > > >> > Any other recommendations for how this could be done?
> >> > > > >> >
> >> > > > >> > On Mon, Feb 19, 2018, 10:33 PM David Capwell <
> >> [email protected]>
> >> > > > wrote:
> >> > > > >> >
> >> > > > >> > > We have been using airflow for logic that delegates to
> other
> >> > > > systems so
> >> > > > >> > > inject a task all tasks depends to make sure all resources
> >> used
> >> > > are
> >> > > > the
> >> > > > >> > > same for all tasks in the dag. This works well for tasks
> that
> >> > > > delegates
> >> > > > >> > to
> >> > > > >> > > external systems but people are starting to need to run
> logic
> >> in
> >> > > > >> airflow
> >> > > > >> > > and the fact that scheduler and all workers can see
> different
> >> > > > states is
> >> > > > >> > > causing issues
> >> > > > >> > >
> >> > > > >> > > We can make sure that all the code is deployed in a
> consistent
> >> > way
> >> > > > but
> >> > > > >> > > need help from the scheduler to tell the workers the
> current
> >> > > > generation
> >> > > > >> > for
> >> > > > >> > > a DAG.
> >> > > > >> > >
> >> > > > >> > > My question is, what would be the best way to modify
> airflow
> >> to
> >> > > > allow
> >> > > > >> > DAGs
> >> > > > >> > > to define a generation value that the scheduler could send
> to
> >> > > > workers?
> >> > > > >> > >
> >> > > > >> > > Thanks
> >> > > > >> > >
> >> > > > >> >
> >> > > > >>
> >> > > >
> >> > > >
> >> > >
> >> >
> >>
>
>

Reply via email to