A few notes about the pickling topic to answer William:
* first reason why we cannot use pickles: jinja template objects are not
picklable, there's a hack to pickle the content of the template instead of
the object, but that breaks jinja inheritance and imports
* pickles are messy, and Airflow allows users to attach objects to DAG
objects (on_error_callable, on_retry_callable, params, ...) and pickles
will go down the recursive rabbit hole and import everything large chunks
of what's in `sys.modules` sometimes. (probably could be mitigated)
* pickles are a messy serialization format, lots of drawbacks, security
issues, incompatibility between py2 and py3, ...
* tpickles have a bad reputation, many people advised avoiding it like
plague since the feature was first built
* original approach of pickling to the db is kind of a hack

I also agree that caching is probably required especially around large DAGs
and for "semi-stateless" web servers to operate properly.

Max


On Thu, Mar 1, 2018 at 1:15 PM, David Capwell <dcapw...@gmail.com> wrote:

> We need two versions but most likely would not use either... That being
> artifactory and git (would really love for this to be pluggable!!!!!)
>
> We have our own dag fetch logic which right now pulls from git, caches,
> then redirect airflow to that directory.  For us we have airflow automated
> so you push a button to get a cluster, for this reason there are enough
> instances that we have DDOS attacked git (opps).
>
> We are planning to change this to fetch from artifactory, and have a
> stateful proxy for each cluster so we stop DDOS attacking core
> infrastructure.
>
> On Mar 1, 2018 11:45 AM, "William Wong" <wongwil...@gmail.com> wrote:
>
> Also relatively new to Airflow here. Same as David above, Option 1 is not
> an option for us either for the same reasons.
>
> What I would like to see is that it can be user selectable / modifiable.
>
> Use Case:
> We have a DAG with thousands of task dependencies/tasks. After 24hrs of
> progressing, we need to take a subset of those tasks and rerun them with a
> different configuration (reasons range from incorrect parameters to
> infrastructure issues, doesn't really matter here).
>
> What I hope can happen:
> 1. Pause DAG
> 2. Upload and tag newest dag version
> 3. Set dag_run to use latest tag,
> 4. Resolve DAG sync using <insert smart diff behavior here that is clearly
> defined/documented>
> 5. Unpause DAG
>
> I do like the DagFetcher idea. This logic should shim in nicely in the
> DagBag code. Maxime, I also vote for the GitDagFetcher. Two thoughts about
> the GitDagFetcher:
> - I probably won't use fuse across 100's of nodes in my k8s/swarm. Not sure
> how this would work without too much trouble.
> - It might be confusing if some git sha's have no changes to a Dag. all
> existing runs will be marked as outdated? probably better than nothing
> anyway.
>
> I also vote to have some form of sort of caching behavior. I prefer not to
> read in DAGs all the time. i.e. from the webserver, scheduler, *and* all
> workers before starting any task over and over again. This is because,
> unfortunately, the assumption that a DAG only takes seconds to load does
> not hold true for large dags. With only 10k tasks within a DAG it's already
> on the order of minutes. This would be untenable as we scale up to even
> larger tags. (Though, I'm testing a fix for this so maybe this might not
> actually be an issue anymore)
>
> FWIW, it seems to me that the DagPickle feature (which, for the love me I
> can't seem to get it to work, no wonder it's being deprecated) would have
> solved a lot of these issues fairly easily. Something along the lines of
> adding pickle_id to dag_run should at  help the scheduler identify the DAG
> version to load and queue. But I'm not sure if it can delete out of sync
> task instances.
>
> Lastly, sorry for the brain dump and derailing the topic, for the workers,
> it seems that importing/loading in the DAG just to execute a single task is
> a bit overkill isn't it? If we kept a caching feature (i.e. pickling),
> perhaps we can simply cache the task and not worry about the rest of the
> DAG tasks?
>
> Will
>
> On Thu, Mar 1, 2018 at 11:30 AM, Maxime Beauchemin <
>
> maximebeauche...@gmail.com> wrote:
>
> > I'm curious to hear which DagFetcher abstraction people would build or
> want
> > to use.
> >
> > So far it sounded like the most popular and flexible approach would be a
> > `GitDagFetcher` where all SHAs and refs become a possibility, as opposed
> to
> > say a TarballOnS3DagFetcher which would require more manual artifact
> > management and versioning, which represent additional [human] workflow on
> > top of the already existing git-based workflow.
> >
> > One way I've seen this done before is by using this Git fuse (file system
> > in user space) hack that creates a virtual filesystem where all SHAs and
> > refs in the Git repo are exposed as a subfolder, and under each ref
> > subfolder the whole repo sits as of that ref. Of course all the files are
> > virtual and fetched at access time by the virtual filesystem using the
> git
> > api. So if you simply point the DagBag loader to the right [virtual]
> > directory, it will import the right version of the DAG. In the git world,
> > the alternative to that is managing temp folders and doing shallow clones
> > which seems like much more of a headache. Note that one tradeoff is that
> if
> > git and whatever it depends has then a need to be highly available.
> >
> > Max
> >
> > On Wed, Feb 28, 2018 at 6:55 PM, David Capwell <dcapw...@gmail.com>
> wrote:
> >
> > > Thanks for all the details! With a pluggable fetcher we would be able
> to
> > > add our own logic for how to fetch so sounds like a good place to start
> > for
> > > something like this!
> > >
> > > On Wed, Feb 28, 2018, 4:39 PM Joy Gao <j...@wepay.com> wrote:
> > >
> > > > +1 on DagFetcher abstraction, very airflow-esque :)
> > > >
> > > > On Wed, Feb 28, 2018 at 11:25 AM, Maxime Beauchemin
> > > > <maximebeauche...@gmail.com> wrote:
> > > > > Addressing a few of your questions / concerns:
> > > > >
> > > > > * The scheduler uses a multiprocess queue to queue up tasks, each
> > > > > subprocess is in charge of a single DAG "scheduler cycle" which
> > > triggers
> > > > > what it can for active DagRuns. Currently it fills the DagBag from
> > the
> > > > > local file system, looking for a specific module where the master
> > > process
> > > > > last saw that DAG. Fetching the DAG is a metadata operation, DAG
> > > > artifacts
> > > > > shouldn't be too large, we can assume that it takes seconds at most
> > to
> > > > > fetch a DAG, which is ok. We generally assume that the scheduler
> > should
> > > > > fully cycle every minute or so. Version-aware DagFetcher could also
> > > > > implement some sort of caching if that was a concern (shouldn't be
> > > > though).
> > > > > * For consistency within the whole DagRun, the scheduler absolutely
> > has
> > > > to
> > > > > read the right version. If tasks got removed they would never get
> > > > scheduled
> > > > > and consistency cannot be achieved.
> > > > > * TaskInstances get created the first time they are identified as
> > > > runnable
> > > > > by the scheduler and are born with a queued status I believe (from
> > > > memory,
> > > > > haven't read the latest code to confirm). The worker double checks
> > and
> > > > sets
> > > > > it as running as part of a database transaction to avoid
> > double-firing.
> > > > >
> > > > > Max
> > > > >
> > > > > On Wed, Feb 28, 2018 at 7:29 AM, Chris Palmer <ch...@crpalmer.com>
> > > > wrote:
> > > > >
> > > > >> I'll preface this with the fact that I'm relatively new to
> Airflow,
> > > and
> > > > >> haven't played around with a lot of the internals.
> > > > >>
> > > > >> I find the idea of a DagFetcher interesting but would we worry
> about
> > > > >> slowing down the scheduler significantly? If the scheduler is
> having
> > > to
> > > > >> "fetch" multiple different DAG versions, be it git refs or
> artifacts
> > > > from
> > > > >> Artifactory, we are talking about adding significant time to each
> > > > scheduler
> > > > >> run. Also how would the scheduler know which DAGs to fetch from
> > where
> > > if
> > > > >> there aren't local files on disk listing those DAGs? Maybe I'm
> > missing
> > > > >> something in the implementation.
> > > > >>
> > > > >> It seems to me that the fetching of the different versions should
> be
> > > > >> delegated to the Task (or TaskInstance) itself. That ensures we
> only
> > > > spend
> > > > >> the time to "fetch" the version that is needed when it is needed.
> > One
> > > > down
> > > > >> side might be that each TaskInstance running for the same version
> of
> > > the
> > > > >> DAG might end up doing the "fetch" independently (duplicating that
> > > > work).
> > > > >>
> > > > >> I think this could be done by adding some version attribute to the
> > > > DagRun
> > > > >> that gets set at creation, and have the scheduler pass that
> version
> > to
> > > > the
> > > > >> TaskInstances when they are created. You could even extend this so
> > > that
> > > > you
> > > > >> could have an arbitrary set of "executor_parameters" that get set
> > on a
> > > > >> DagRun and are passed to TaskInstances. Then the specific Executor
> > > class
> > > > >> that is running that TaskInstance could handle the
> > > > "executor_parameters" as
> > > > >> it sees fit.
> > > > >>
> > > > >> One thing I'm not clear on is how and when TaskInstances are
> > created.
> > > > When
> > > > >> the scheduler first sees a specific DagRun do all the
> TaskInstances
> > > get
> > > > >> created immediately, but only some of them get queued? Or does the
> > > > >> scheduler only create those TaskInstances which can be queued
> right
> > > now?
> > > > >>
> > > > >> In particular if a DagRun gets created and while it is running the
> > DAG
> > > > is
> > > > >> updated and a new Task is added, will the scheduler pick up that
> new
> > > > Task
> > > > >> for the running DagRun? If the answer is yes, then my suggestion
> > above
> > > > >> would run the risk of scheduling a Task for a DAG version where
> that
> > > > Task
> > > > >> didn't exist. I'm sure you could handle that somewhat gracefully
> but
> > > > it's a
> > > > >> bit ugly.
> > > > >>
> > > > >> Chris
> > > > >>
> > > > >> On Wed, Feb 28, 2018 at 2:05 AM, Maxime Beauchemin <
> > > > >> maximebeauche...@gmail.com> wrote:
> > > > >>
> > > > >> > At a higher level I want to say a few things about the idea of
> > > > enforcing
> > > > >> > version consistency within a DagRun.
> > > > >> >
> > > > >> > One thing we've been talking about is the need for a
> "DagFetcher"
> > > > >> > abstraction, where it's first implementation that would replace
> > and
> > > > mimic
> > > > >> > the current one would be "FileSystemDagFetcher". One specific
> > > > DagFetcher
> > > > >> > implementation may or may not support version semantics, but if
> it
> > > > does
> > > > >> > should be able to receive a version id and return the proper
> > version
> > > > of
> > > > >> the
> > > > >> > DAG object. For instance that first "FileSystemDagFetcher" would
> > not
> > > > >> > support version semantic, but perhaps a "GitRepoDagFetcher"
> would,
> > > or
> > > > an
> > > > >> > "ArtifactoryDagFetcher", or "TarballInS3DagFetcher" may as well.
> > > > >> >
> > > > >> > Of course that assumes that the scheduler knows and stores the
> > > active
> > > > >> > version number when generating a new DagRun, and for that
> > > information
> > > > to
> > > > >> be
> > > > >> > leveraged on subsequent scheduler cycles and on workers when
> task
> > > are
> > > > >> > executed.
> > > > >> >
> > > > >> > This could also enable things like "remote" backfills (non
> local,
> > > > >> > parallelized) of a DAG definition that's on an arbitrary git ref
> > > > >> (assuming
> > > > >> > a "GitRepoDagFetcher").
> > > > >> >
> > > > >> > There are [perhaps] unintuitive implications where clearing a
> > single
> > > > task
> > > > >> > would then re-run the old DAG definition on that task (since the
> > > > version
> > > > >> > was stamped in the DagRun and hasn't changed), but
> > > > deleting/recreating a
> > > > >> > DagRun would run the latest version (or any other version that
> may
> > > be
> > > > >> > specified for that matter).
> > > > >> >
> > > > >> > I'm unclear on how much work that represents exactly, but it's
> > > > certainly
> > > > >> > doable and may only require to change part of the DagBag class
> > and a
> > > > few
> > > > >> > other places.
> > > > >> >
> > > > >> > Max
> > > > >> >
> > > > >> > On Tue, Feb 27, 2018 at 6:48 PM, David Capwell <
> > dcapw...@gmail.com>
> > > > >> wrote:
> > > > >> >
> > > > >> > > Thanks for your feedback!
> > > > >> > >
> > > > >> > > Option 1 is a non-starter for us. The reason is we have DAGs
> > that
> > > > take
> > > > >> 9+
> > > > >> > > hours to run.
> > > > >> > >
> > > > >> > > Option 2 is more where my mind was going, but it's rather
> large.
> > > > How I
> > > > >> > see
> > > > >> > > it you need a MVCC DagBag that's aware of multiple versions
> > (what
> > > > >> > provides
> > > > >> > > version?).  Assuming you can track active dag runs pointing to
> > > which
> > > > >> > > versions you know how to cleanup (fine with external).  The
> pro
> > > > here is
> > > > >> > you
> > > > >> > > have snapshot isolation for dag_run, con is more bookkeeping
> and
> > > > >> require
> > > > >> > > deploy to work with this (last part may be a good thing
> though).
> > > > >> > >
> > > > >> > > The only other option I can think of is to lock deploy so the
> > > system
> > > > >> only
> > > > >> > > picks up new versions when no dag_run holds the lock.  This is
> > > > flawed
> > > > >> for
> > > > >> > > many reasons, but breaks horrible for dag_runs that takes
> > minutes
> > > (I
> > > > >> > assume
> > > > >> > > 99% do).
> > > > >> > >
> > > > >> > >
> > > > >> > >
> > > > >> > > On Tue, Feb 27, 2018, 4:50 PM Joy Gao <j...@wepay.com> wrote:
> > > > >> > >
> > > > >> > > > Hi David!
> > > > >> > > >
> > > > >> > > > Thank you for clarifying, I think I understand your concern
> > now.
> > > > We
> > > > >> > > > currently also work around this by making sure a dag is
> turned
> > > off
> > > > >> > > > when we deploy a new version. We also make sure our jobs are
> > > > >> > > > idempotent and retry-enabled in the case when we forget to
> > turn
> > > > off
> > > > >> > > > the job, so the issue hasn't caused us too much headache.
> > > > >> > > >
> > > > >> > > > I do agree that it would be nice for Airflow to have the
> > option
> > > to
> > > > >> > > > guarantee a single version of dag per dag run. I see two
> > > > approaches:
> > > > >> > > >
> > > > >> > > > (1) If a dag is updated, the current dagrun fails and/or
> > > retries.
> > > > >> > > > (2) If a dag is updated, the current dagrun continues but
> uses
> > > > >> version
> > > > >> > > > before the update.
> > > > >> > > >
> > > > >> > > > (1) requires some mechanism to compare dag generations. One
> > > > option is
> > > > >> > > > to hash the dagfile and storing that value to the dagrun
> > table,
> > > > and
> > > > >> > > > compare against it each time a task is running. And in the
> > case
> > > if
> > > > >> the
> > > > >> > > > hash value is different, update the hash value, then
> > fail/retry
> > > > the
> > > > >> > > > dag. I think this is a fairly safe approach.
> > > > >> > > >
> > > > >> > > > (2) is trickier. A dag only has a property "fileloc" which
> > > tracks
> > > > the
> > > > >> > > > location of the dag file, but the actual content of the dag
> > file
> > > > is
> > > > >> > > > never versioned. When a task instance starts running, it
> > > > dynamically
> > > > >> > > > re-processes the dag file specified by the fileloc, generate
> > all
> > > > the
> > > > >> > > > task objects from the dag file, and fetch the task object by
> > > > task_id
> > > > >> > > > in order to execute it. So in order to guarantee each dagrun
> > to
> > > > run a
> > > > >> > > > specific version, previous versions must be maintained on
> disk
> > > > >> somehow
> > > > >> > > > (maintaining this information in memory is difficult, since
> if
> > > the
> > > > >> > > > scheduler/worker shuts down, that information is lost). This
> > > > makes it
> > > > >> > > > a pretty big change, and I haven't thought much on how to
> > > > implement
> > > > >> > > > it.
> > > > >> > > >
> > > > >> > > > I'm personally leaning towards (1) for sake of simplicity.
> > Note
> > > > that
> > > > >> > > > some users may not want dag to fail/retry even when dag is
> > > > updated,
> > > > >> so
> > > > >> > > > this should be an optional feature, not required.
> > > > >> > > >
> > > > >> > > > My scheduler-foo isn't that great, so curious what others
> have
> > > to
> > > > say
> > > > >> > > > about this.
> > > > >> > > >
> > > > >> > > > On Fri, Feb 23, 2018 at 3:12 PM, David Capwell <
> > > > dcapw...@gmail.com>
> > > > >> > > wrote:
> > > > >> > > > > Thanks for the reply Joy, let me walk you though things as
> > > they
> > > > are
> > > > >> > > today
> > > > >> > > > >
> > > > >> > > > > 1) we don't stop airflow or disable DAGs while deploying
> > > > updates to
> > > > >> > > > logic,
> > > > >> > > > > this is done live once its released
> > > > >> > > > > 2) the python script in the DAG folder doesn't actually
> have
> > > > DAGs
> > > > >> in
> > > > >> > it
> > > > >> > > > but
> > > > >> > > > > is a shim layer to allow us to deploy in a atomic way for
> a
> > > > single
> > > > >> > host
> > > > >> > > > >   2.1) this script reads a file on local disk (less than
> > disk
> > > > page
> > > > >> > > size)
> > > > >> > > > to
> > > > >> > > > > find latest git commit deployed
> > > > >> > > > >   2.2) re-does the airflow DAG load process but pointing
> to
> > > the
> > > > git
> > > > >> > > > commit
> > > > >> > > > > path
> > > > >> > > > >
> > > > >> > > > > Example directory structure
> > > > >> > > > >
> > > > >> > > > > /airflow/dags/shim.py
> > > > >> > > > > /airflow/real_dags/
> > > > >> > > > >                             /latest # pointer to latest
> > commit
> > > > >> > > > >                             /[git commit]/
> > > > >> > > > >
> > > > >> > > > > This is how we make sure deploys are consistent within a
> > > single
> > > > >> task.
> > > > >> > > > >
> > > > >> > > > >
> > > > >> > > > > Now, lets assume we have a fully atomic commit process and
> > are
> > > > able
> > > > >> > to
> > > > >> > > > > upgrade DAGs at the exact same moment.
> > > > >> > > > >
> > > > >> > > > > At time T0 the scheduler knows of DAG V1 and schedules two
> > > > tasks,
> > > > >> > > Task1,
> > > > >> > > > > and Task2
> > > > >> > > > > At time T1 Task1 is picked up by Worker1, so starts
> > executing
> > > > the
> > > > >> > task
> > > > >> > > > (V1
> > > > >> > > > > logic)
> > > > >> > > > > At time T2 deploy commit happens, current DAG version: V2
> > > > >> > > > > At time T3, Task2 is picked up by Worker2, so starts
> > executing
> > > > the
> > > > >> > task
> > > > >> > > > (V2
> > > > >> > > > > logic)
> > > > >> > > > >
> > > > >> > > > > In many cases this isn't really a problem (tuning config
> > > change
> > > > to
> > > > >> > > hadoop
> > > > >> > > > > job), but as we have more people using Airflow this is
> > > causing a
> > > > >> lot
> > > > >> > of
> > > > >> > > > > time spent debugging why production acted differently than
> > > > expected
> > > > >> > > (the
> > > > >> > > > > problem was already fixed... why is it still here?).  We
> > also
> > > > see
> > > > >> > that
> > > > >> > > > some
> > > > >> > > > > tasks expect a given behavior from other tasks, and since
> > they
> > > > live
> > > > >> > in
> > > > >> > > > the
> > > > >> > > > > same git repo they can modify both tasks at the same time
> > if a
> > > > >> > breaking
> > > > >> > > > > change is needed, but when this rolls out to prod there
> > isn't
> > > a
> > > > way
> > > > >> > to
> > > > >> > > do
> > > > >> > > > > this other than turn off the DAG, and login to all hosts
> to
> > > > verify
> > > > >> > > fully
> > > > >> > > > > deployed.
> > > > >> > > > >
> > > > >> > > > > We would like to remove this confusion and make
> > > > >> generations/versions
> > > > >> > > > (same
> > > > >> > > > > thing really) exposed to users and make sure for a single
> > > > dag_run
> > > > >> > only
> > > > >> > > > one
> > > > >> > > > > version is used.
> > > > >> > > > >
> > > > >> > > > > I hope this is more clear.
> > > > >> > > > >
> > > > >> > > > > On Fri, Feb 23, 2018 at 1:37 PM, Joy Gao <j...@wepay.com>
> > > > wrote:
> > > > >> > > > >
> > > > >> > > > >> Hi David,
> > > > >> > > > >>
> > > > >> > > > >> Do you mind providing a concrete example of the scenario
> in
> > > > which
> > > > >> > > > >> scheduler/workers see different states (I'm not 100% sure
> > if
> > > I
> > > > >> > > > understood
> > > > >> > > > >> the issue at hand).
> > > > >> > > > >>
> > > > >> > > > >> And by same dag generation, are you referring to the dag
> > > > version?
> > > > >> > (DAG
> > > > >> > > > >> version is currently not supported at all, but I can see
> it
> > > > being
> > > > >> a
> > > > >> > > > >> building block for future use cases).
> > > > >> > > > >>
> > > > >> > > > >> Joy
> > > > >> > > > >>
> > > > >> > > > >> On Fri, Feb 23, 2018 at 1:00 PM, David Capwell <
> > > > >> dcapw...@gmail.com>
> > > > >> > > > wrote:
> > > > >> > > > >>
> > > > >> > > > >> > My current thinking is to add a field to the dag table
> > that
> > > > is
> > > > >> > > > optional
> > > > >> > > > >> and
> > > > >> > > > >> > provided by the dag. We currently intercept the load
> path
> > > do
> > > > >> could
> > > > >> > > use
> > > > >> > > > >> this
> > > > >> > > > >> > field to make sure we load the same generation.  My
> > concern
> > > > here
> > > > >> > is
> > > > >> > > > the
> > > > >> > > > >> > interaction with the scheduler, not as familiar with
> that
> > > > logic
> > > > >> to
> > > > >> > > > >> predict
> > > > >> > > > >> > corner cases were this would fail.
> > > > >> > > > >> >
> > > > >> > > > >> > Any other recommendations for how this could be done?
> > > > >> > > > >> >
> > > > >> > > > >> > On Mon, Feb 19, 2018, 10:33 PM David Capwell <
> > > > >> dcapw...@gmail.com>
> > > > >> > > > wrote:
> > > > >> > > > >> >
> > > > >> > > > >> > > We have been using airflow for logic that delegates
> to
> > > > other
> > > > >> > > > systems so
> > > > >> > > > >> > > inject a task all tasks depends to make sure all
> > > resources
> > > > >> used
> > > > >> > > are
> > > > >> > > > the
> > > > >> > > > >> > > same for all tasks in the dag. This works well for
> > tasks
> > > > that
> > > > >> > > > delegates
> > > > >> > > > >> > to
> > > > >> > > > >> > > external systems but people are starting to need to
> run
> > > > logic
> > > > >> in
> > > > >> > > > >> airflow
> > > > >> > > > >> > > and the fact that scheduler and all workers can see
> > > > different
> > > > >> > > > states is
> > > > >> > > > >> > > causing issues
> > > > >> > > > >> > >
> > > > >> > > > >> > > We can make sure that all the code is deployed in a
> > > > consistent
> > > > >> > way
> > > > >> > > > but
> > > > >> > > > >> > > need help from the scheduler to tell the workers the
> > > > current
> > > > >> > > > generation
> > > > >> > > > >> > for
> > > > >> > > > >> > > a DAG.
> > > > >> > > > >> > >
> > > > >> > > > >> > > My question is, what would be the best way to modify
> > > > airflow
> > > > >> to
> > > > >> > > > allow
> > > > >> > > > >> > DAGs
> > > > >> > > > >> > > to define a generation value that the scheduler could
> > > send
> > > > to
> > > > >> > > > workers?
> > > > >> > > > >> > >
> > > > >> > > > >> > > Thanks
> > > > >> > > > >> > >
> > > > >> > > > >> >
> > > > >> > > > >>
> > > > >> > > >
> > > > >> > > >
> > > > >> > >
> > > > >> >
> > > > >>
> > > >
> > > >
> > >
> >
>

Reply via email to