A few notes about the pickling topic to answer William: * first reason why we cannot use pickles: jinja template objects are not picklable, there's a hack to pickle the content of the template instead of the object, but that breaks jinja inheritance and imports * pickles are messy, and Airflow allows users to attach objects to DAG objects (on_error_callable, on_retry_callable, params, ...) and pickles will go down the recursive rabbit hole and import everything large chunks of what's in `sys.modules` sometimes. (probably could be mitigated) * pickles are a messy serialization format, lots of drawbacks, security issues, incompatibility between py2 and py3, ... * tpickles have a bad reputation, many people advised avoiding it like plague since the feature was first built * original approach of pickling to the db is kind of a hack
I also agree that caching is probably required especially around large DAGs and for "semi-stateless" web servers to operate properly. Max On Thu, Mar 1, 2018 at 1:15 PM, David Capwell <dcapw...@gmail.com> wrote: > We need two versions but most likely would not use either... That being > artifactory and git (would really love for this to be pluggable!!!!!) > > We have our own dag fetch logic which right now pulls from git, caches, > then redirect airflow to that directory. For us we have airflow automated > so you push a button to get a cluster, for this reason there are enough > instances that we have DDOS attacked git (opps). > > We are planning to change this to fetch from artifactory, and have a > stateful proxy for each cluster so we stop DDOS attacking core > infrastructure. > > On Mar 1, 2018 11:45 AM, "William Wong" <wongwil...@gmail.com> wrote: > > Also relatively new to Airflow here. Same as David above, Option 1 is not > an option for us either for the same reasons. > > What I would like to see is that it can be user selectable / modifiable. > > Use Case: > We have a DAG with thousands of task dependencies/tasks. After 24hrs of > progressing, we need to take a subset of those tasks and rerun them with a > different configuration (reasons range from incorrect parameters to > infrastructure issues, doesn't really matter here). > > What I hope can happen: > 1. Pause DAG > 2. Upload and tag newest dag version > 3. Set dag_run to use latest tag, > 4. Resolve DAG sync using <insert smart diff behavior here that is clearly > defined/documented> > 5. Unpause DAG > > I do like the DagFetcher idea. This logic should shim in nicely in the > DagBag code. Maxime, I also vote for the GitDagFetcher. Two thoughts about > the GitDagFetcher: > - I probably won't use fuse across 100's of nodes in my k8s/swarm. Not sure > how this would work without too much trouble. > - It might be confusing if some git sha's have no changes to a Dag. all > existing runs will be marked as outdated? probably better than nothing > anyway. > > I also vote to have some form of sort of caching behavior. I prefer not to > read in DAGs all the time. i.e. from the webserver, scheduler, *and* all > workers before starting any task over and over again. This is because, > unfortunately, the assumption that a DAG only takes seconds to load does > not hold true for large dags. With only 10k tasks within a DAG it's already > on the order of minutes. This would be untenable as we scale up to even > larger tags. (Though, I'm testing a fix for this so maybe this might not > actually be an issue anymore) > > FWIW, it seems to me that the DagPickle feature (which, for the love me I > can't seem to get it to work, no wonder it's being deprecated) would have > solved a lot of these issues fairly easily. Something along the lines of > adding pickle_id to dag_run should at help the scheduler identify the DAG > version to load and queue. But I'm not sure if it can delete out of sync > task instances. > > Lastly, sorry for the brain dump and derailing the topic, for the workers, > it seems that importing/loading in the DAG just to execute a single task is > a bit overkill isn't it? If we kept a caching feature (i.e. pickling), > perhaps we can simply cache the task and not worry about the rest of the > DAG tasks? > > Will > > On Thu, Mar 1, 2018 at 11:30 AM, Maxime Beauchemin < > > maximebeauche...@gmail.com> wrote: > > > I'm curious to hear which DagFetcher abstraction people would build or > want > > to use. > > > > So far it sounded like the most popular and flexible approach would be a > > `GitDagFetcher` where all SHAs and refs become a possibility, as opposed > to > > say a TarballOnS3DagFetcher which would require more manual artifact > > management and versioning, which represent additional [human] workflow on > > top of the already existing git-based workflow. > > > > One way I've seen this done before is by using this Git fuse (file system > > in user space) hack that creates a virtual filesystem where all SHAs and > > refs in the Git repo are exposed as a subfolder, and under each ref > > subfolder the whole repo sits as of that ref. Of course all the files are > > virtual and fetched at access time by the virtual filesystem using the > git > > api. So if you simply point the DagBag loader to the right [virtual] > > directory, it will import the right version of the DAG. In the git world, > > the alternative to that is managing temp folders and doing shallow clones > > which seems like much more of a headache. Note that one tradeoff is that > if > > git and whatever it depends has then a need to be highly available. > > > > Max > > > > On Wed, Feb 28, 2018 at 6:55 PM, David Capwell <dcapw...@gmail.com> > wrote: > > > > > Thanks for all the details! With a pluggable fetcher we would be able > to > > > add our own logic for how to fetch so sounds like a good place to start > > for > > > something like this! > > > > > > On Wed, Feb 28, 2018, 4:39 PM Joy Gao <j...@wepay.com> wrote: > > > > > > > +1 on DagFetcher abstraction, very airflow-esque :) > > > > > > > > On Wed, Feb 28, 2018 at 11:25 AM, Maxime Beauchemin > > > > <maximebeauche...@gmail.com> wrote: > > > > > Addressing a few of your questions / concerns: > > > > > > > > > > * The scheduler uses a multiprocess queue to queue up tasks, each > > > > > subprocess is in charge of a single DAG "scheduler cycle" which > > > triggers > > > > > what it can for active DagRuns. Currently it fills the DagBag from > > the > > > > > local file system, looking for a specific module where the master > > > process > > > > > last saw that DAG. Fetching the DAG is a metadata operation, DAG > > > > artifacts > > > > > shouldn't be too large, we can assume that it takes seconds at most > > to > > > > > fetch a DAG, which is ok. We generally assume that the scheduler > > should > > > > > fully cycle every minute or so. Version-aware DagFetcher could also > > > > > implement some sort of caching if that was a concern (shouldn't be > > > > though). > > > > > * For consistency within the whole DagRun, the scheduler absolutely > > has > > > > to > > > > > read the right version. If tasks got removed they would never get > > > > scheduled > > > > > and consistency cannot be achieved. > > > > > * TaskInstances get created the first time they are identified as > > > > runnable > > > > > by the scheduler and are born with a queued status I believe (from > > > > memory, > > > > > haven't read the latest code to confirm). The worker double checks > > and > > > > sets > > > > > it as running as part of a database transaction to avoid > > double-firing. > > > > > > > > > > Max > > > > > > > > > > On Wed, Feb 28, 2018 at 7:29 AM, Chris Palmer <ch...@crpalmer.com> > > > > wrote: > > > > > > > > > >> I'll preface this with the fact that I'm relatively new to > Airflow, > > > and > > > > >> haven't played around with a lot of the internals. > > > > >> > > > > >> I find the idea of a DagFetcher interesting but would we worry > about > > > > >> slowing down the scheduler significantly? If the scheduler is > having > > > to > > > > >> "fetch" multiple different DAG versions, be it git refs or > artifacts > > > > from > > > > >> Artifactory, we are talking about adding significant time to each > > > > scheduler > > > > >> run. Also how would the scheduler know which DAGs to fetch from > > where > > > if > > > > >> there aren't local files on disk listing those DAGs? Maybe I'm > > missing > > > > >> something in the implementation. > > > > >> > > > > >> It seems to me that the fetching of the different versions should > be > > > > >> delegated to the Task (or TaskInstance) itself. That ensures we > only > > > > spend > > > > >> the time to "fetch" the version that is needed when it is needed. > > One > > > > down > > > > >> side might be that each TaskInstance running for the same version > of > > > the > > > > >> DAG might end up doing the "fetch" independently (duplicating that > > > > work). > > > > >> > > > > >> I think this could be done by adding some version attribute to the > > > > DagRun > > > > >> that gets set at creation, and have the scheduler pass that > version > > to > > > > the > > > > >> TaskInstances when they are created. You could even extend this so > > > that > > > > you > > > > >> could have an arbitrary set of "executor_parameters" that get set > > on a > > > > >> DagRun and are passed to TaskInstances. Then the specific Executor > > > class > > > > >> that is running that TaskInstance could handle the > > > > "executor_parameters" as > > > > >> it sees fit. > > > > >> > > > > >> One thing I'm not clear on is how and when TaskInstances are > > created. > > > > When > > > > >> the scheduler first sees a specific DagRun do all the > TaskInstances > > > get > > > > >> created immediately, but only some of them get queued? Or does the > > > > >> scheduler only create those TaskInstances which can be queued > right > > > now? > > > > >> > > > > >> In particular if a DagRun gets created and while it is running the > > DAG > > > > is > > > > >> updated and a new Task is added, will the scheduler pick up that > new > > > > Task > > > > >> for the running DagRun? If the answer is yes, then my suggestion > > above > > > > >> would run the risk of scheduling a Task for a DAG version where > that > > > > Task > > > > >> didn't exist. I'm sure you could handle that somewhat gracefully > but > > > > it's a > > > > >> bit ugly. > > > > >> > > > > >> Chris > > > > >> > > > > >> On Wed, Feb 28, 2018 at 2:05 AM, Maxime Beauchemin < > > > > >> maximebeauche...@gmail.com> wrote: > > > > >> > > > > >> > At a higher level I want to say a few things about the idea of > > > > enforcing > > > > >> > version consistency within a DagRun. > > > > >> > > > > > >> > One thing we've been talking about is the need for a > "DagFetcher" > > > > >> > abstraction, where it's first implementation that would replace > > and > > > > mimic > > > > >> > the current one would be "FileSystemDagFetcher". One specific > > > > DagFetcher > > > > >> > implementation may or may not support version semantics, but if > it > > > > does > > > > >> > should be able to receive a version id and return the proper > > version > > > > of > > > > >> the > > > > >> > DAG object. For instance that first "FileSystemDagFetcher" would > > not > > > > >> > support version semantic, but perhaps a "GitRepoDagFetcher" > would, > > > or > > > > an > > > > >> > "ArtifactoryDagFetcher", or "TarballInS3DagFetcher" may as well. > > > > >> > > > > > >> > Of course that assumes that the scheduler knows and stores the > > > active > > > > >> > version number when generating a new DagRun, and for that > > > information > > > > to > > > > >> be > > > > >> > leveraged on subsequent scheduler cycles and on workers when > task > > > are > > > > >> > executed. > > > > >> > > > > > >> > This could also enable things like "remote" backfills (non > local, > > > > >> > parallelized) of a DAG definition that's on an arbitrary git ref > > > > >> (assuming > > > > >> > a "GitRepoDagFetcher"). > > > > >> > > > > > >> > There are [perhaps] unintuitive implications where clearing a > > single > > > > task > > > > >> > would then re-run the old DAG definition on that task (since the > > > > version > > > > >> > was stamped in the DagRun and hasn't changed), but > > > > deleting/recreating a > > > > >> > DagRun would run the latest version (or any other version that > may > > > be > > > > >> > specified for that matter). > > > > >> > > > > > >> > I'm unclear on how much work that represents exactly, but it's > > > > certainly > > > > >> > doable and may only require to change part of the DagBag class > > and a > > > > few > > > > >> > other places. > > > > >> > > > > > >> > Max > > > > >> > > > > > >> > On Tue, Feb 27, 2018 at 6:48 PM, David Capwell < > > dcapw...@gmail.com> > > > > >> wrote: > > > > >> > > > > > >> > > Thanks for your feedback! > > > > >> > > > > > > >> > > Option 1 is a non-starter for us. The reason is we have DAGs > > that > > > > take > > > > >> 9+ > > > > >> > > hours to run. > > > > >> > > > > > > >> > > Option 2 is more where my mind was going, but it's rather > large. > > > > How I > > > > >> > see > > > > >> > > it you need a MVCC DagBag that's aware of multiple versions > > (what > > > > >> > provides > > > > >> > > version?). Assuming you can track active dag runs pointing to > > > which > > > > >> > > versions you know how to cleanup (fine with external). The > pro > > > > here is > > > > >> > you > > > > >> > > have snapshot isolation for dag_run, con is more bookkeeping > and > > > > >> require > > > > >> > > deploy to work with this (last part may be a good thing > though). > > > > >> > > > > > > >> > > The only other option I can think of is to lock deploy so the > > > system > > > > >> only > > > > >> > > picks up new versions when no dag_run holds the lock. This is > > > > flawed > > > > >> for > > > > >> > > many reasons, but breaks horrible for dag_runs that takes > > minutes > > > (I > > > > >> > assume > > > > >> > > 99% do). > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > On Tue, Feb 27, 2018, 4:50 PM Joy Gao <j...@wepay.com> wrote: > > > > >> > > > > > > >> > > > Hi David! > > > > >> > > > > > > > >> > > > Thank you for clarifying, I think I understand your concern > > now. > > > > We > > > > >> > > > currently also work around this by making sure a dag is > turned > > > off > > > > >> > > > when we deploy a new version. We also make sure our jobs are > > > > >> > > > idempotent and retry-enabled in the case when we forget to > > turn > > > > off > > > > >> > > > the job, so the issue hasn't caused us too much headache. > > > > >> > > > > > > > >> > > > I do agree that it would be nice for Airflow to have the > > option > > > to > > > > >> > > > guarantee a single version of dag per dag run. I see two > > > > approaches: > > > > >> > > > > > > > >> > > > (1) If a dag is updated, the current dagrun fails and/or > > > retries. > > > > >> > > > (2) If a dag is updated, the current dagrun continues but > uses > > > > >> version > > > > >> > > > before the update. > > > > >> > > > > > > > >> > > > (1) requires some mechanism to compare dag generations. One > > > > option is > > > > >> > > > to hash the dagfile and storing that value to the dagrun > > table, > > > > and > > > > >> > > > compare against it each time a task is running. And in the > > case > > > if > > > > >> the > > > > >> > > > hash value is different, update the hash value, then > > fail/retry > > > > the > > > > >> > > > dag. I think this is a fairly safe approach. > > > > >> > > > > > > > >> > > > (2) is trickier. A dag only has a property "fileloc" which > > > tracks > > > > the > > > > >> > > > location of the dag file, but the actual content of the dag > > file > > > > is > > > > >> > > > never versioned. When a task instance starts running, it > > > > dynamically > > > > >> > > > re-processes the dag file specified by the fileloc, generate > > all > > > > the > > > > >> > > > task objects from the dag file, and fetch the task object by > > > > task_id > > > > >> > > > in order to execute it. So in order to guarantee each dagrun > > to > > > > run a > > > > >> > > > specific version, previous versions must be maintained on > disk > > > > >> somehow > > > > >> > > > (maintaining this information in memory is difficult, since > if > > > the > > > > >> > > > scheduler/worker shuts down, that information is lost). This > > > > makes it > > > > >> > > > a pretty big change, and I haven't thought much on how to > > > > implement > > > > >> > > > it. > > > > >> > > > > > > > >> > > > I'm personally leaning towards (1) for sake of simplicity. > > Note > > > > that > > > > >> > > > some users may not want dag to fail/retry even when dag is > > > > updated, > > > > >> so > > > > >> > > > this should be an optional feature, not required. > > > > >> > > > > > > > >> > > > My scheduler-foo isn't that great, so curious what others > have > > > to > > > > say > > > > >> > > > about this. > > > > >> > > > > > > > >> > > > On Fri, Feb 23, 2018 at 3:12 PM, David Capwell < > > > > dcapw...@gmail.com> > > > > >> > > wrote: > > > > >> > > > > Thanks for the reply Joy, let me walk you though things as > > > they > > > > are > > > > >> > > today > > > > >> > > > > > > > > >> > > > > 1) we don't stop airflow or disable DAGs while deploying > > > > updates to > > > > >> > > > logic, > > > > >> > > > > this is done live once its released > > > > >> > > > > 2) the python script in the DAG folder doesn't actually > have > > > > DAGs > > > > >> in > > > > >> > it > > > > >> > > > but > > > > >> > > > > is a shim layer to allow us to deploy in a atomic way for > a > > > > single > > > > >> > host > > > > >> > > > > 2.1) this script reads a file on local disk (less than > > disk > > > > page > > > > >> > > size) > > > > >> > > > to > > > > >> > > > > find latest git commit deployed > > > > >> > > > > 2.2) re-does the airflow DAG load process but pointing > to > > > the > > > > git > > > > >> > > > commit > > > > >> > > > > path > > > > >> > > > > > > > > >> > > > > Example directory structure > > > > >> > > > > > > > > >> > > > > /airflow/dags/shim.py > > > > >> > > > > /airflow/real_dags/ > > > > >> > > > > /latest # pointer to latest > > commit > > > > >> > > > > /[git commit]/ > > > > >> > > > > > > > > >> > > > > This is how we make sure deploys are consistent within a > > > single > > > > >> task. > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > Now, lets assume we have a fully atomic commit process and > > are > > > > able > > > > >> > to > > > > >> > > > > upgrade DAGs at the exact same moment. > > > > >> > > > > > > > > >> > > > > At time T0 the scheduler knows of DAG V1 and schedules two > > > > tasks, > > > > >> > > Task1, > > > > >> > > > > and Task2 > > > > >> > > > > At time T1 Task1 is picked up by Worker1, so starts > > executing > > > > the > > > > >> > task > > > > >> > > > (V1 > > > > >> > > > > logic) > > > > >> > > > > At time T2 deploy commit happens, current DAG version: V2 > > > > >> > > > > At time T3, Task2 is picked up by Worker2, so starts > > executing > > > > the > > > > >> > task > > > > >> > > > (V2 > > > > >> > > > > logic) > > > > >> > > > > > > > > >> > > > > In many cases this isn't really a problem (tuning config > > > change > > > > to > > > > >> > > hadoop > > > > >> > > > > job), but as we have more people using Airflow this is > > > causing a > > > > >> lot > > > > >> > of > > > > >> > > > > time spent debugging why production acted differently than > > > > expected > > > > >> > > (the > > > > >> > > > > problem was already fixed... why is it still here?). We > > also > > > > see > > > > >> > that > > > > >> > > > some > > > > >> > > > > tasks expect a given behavior from other tasks, and since > > they > > > > live > > > > >> > in > > > > >> > > > the > > > > >> > > > > same git repo they can modify both tasks at the same time > > if a > > > > >> > breaking > > > > >> > > > > change is needed, but when this rolls out to prod there > > isn't > > > a > > > > way > > > > >> > to > > > > >> > > do > > > > >> > > > > this other than turn off the DAG, and login to all hosts > to > > > > verify > > > > >> > > fully > > > > >> > > > > deployed. > > > > >> > > > > > > > > >> > > > > We would like to remove this confusion and make > > > > >> generations/versions > > > > >> > > > (same > > > > >> > > > > thing really) exposed to users and make sure for a single > > > > dag_run > > > > >> > only > > > > >> > > > one > > > > >> > > > > version is used. > > > > >> > > > > > > > > >> > > > > I hope this is more clear. > > > > >> > > > > > > > > >> > > > > On Fri, Feb 23, 2018 at 1:37 PM, Joy Gao <j...@wepay.com> > > > > wrote: > > > > >> > > > > > > > > >> > > > >> Hi David, > > > > >> > > > >> > > > > >> > > > >> Do you mind providing a concrete example of the scenario > in > > > > which > > > > >> > > > >> scheduler/workers see different states (I'm not 100% sure > > if > > > I > > > > >> > > > understood > > > > >> > > > >> the issue at hand). > > > > >> > > > >> > > > > >> > > > >> And by same dag generation, are you referring to the dag > > > > version? > > > > >> > (DAG > > > > >> > > > >> version is currently not supported at all, but I can see > it > > > > being > > > > >> a > > > > >> > > > >> building block for future use cases). > > > > >> > > > >> > > > > >> > > > >> Joy > > > > >> > > > >> > > > > >> > > > >> On Fri, Feb 23, 2018 at 1:00 PM, David Capwell < > > > > >> dcapw...@gmail.com> > > > > >> > > > wrote: > > > > >> > > > >> > > > > >> > > > >> > My current thinking is to add a field to the dag table > > that > > > > is > > > > >> > > > optional > > > > >> > > > >> and > > > > >> > > > >> > provided by the dag. We currently intercept the load > path > > > do > > > > >> could > > > > >> > > use > > > > >> > > > >> this > > > > >> > > > >> > field to make sure we load the same generation. My > > concern > > > > here > > > > >> > is > > > > >> > > > the > > > > >> > > > >> > interaction with the scheduler, not as familiar with > that > > > > logic > > > > >> to > > > > >> > > > >> predict > > > > >> > > > >> > corner cases were this would fail. > > > > >> > > > >> > > > > > >> > > > >> > Any other recommendations for how this could be done? > > > > >> > > > >> > > > > > >> > > > >> > On Mon, Feb 19, 2018, 10:33 PM David Capwell < > > > > >> dcapw...@gmail.com> > > > > >> > > > wrote: > > > > >> > > > >> > > > > > >> > > > >> > > We have been using airflow for logic that delegates > to > > > > other > > > > >> > > > systems so > > > > >> > > > >> > > inject a task all tasks depends to make sure all > > > resources > > > > >> used > > > > >> > > are > > > > >> > > > the > > > > >> > > > >> > > same for all tasks in the dag. This works well for > > tasks > > > > that > > > > >> > > > delegates > > > > >> > > > >> > to > > > > >> > > > >> > > external systems but people are starting to need to > run > > > > logic > > > > >> in > > > > >> > > > >> airflow > > > > >> > > > >> > > and the fact that scheduler and all workers can see > > > > different > > > > >> > > > states is > > > > >> > > > >> > > causing issues > > > > >> > > > >> > > > > > > >> > > > >> > > We can make sure that all the code is deployed in a > > > > consistent > > > > >> > way > > > > >> > > > but > > > > >> > > > >> > > need help from the scheduler to tell the workers the > > > > current > > > > >> > > > generation > > > > >> > > > >> > for > > > > >> > > > >> > > a DAG. > > > > >> > > > >> > > > > > > >> > > > >> > > My question is, what would be the best way to modify > > > > airflow > > > > >> to > > > > >> > > > allow > > > > >> > > > >> > DAGs > > > > >> > > > >> > > to define a generation value that the scheduler could > > > send > > > > to > > > > >> > > > workers? > > > > >> > > > >> > > > > > > >> > > > >> > > Thanks > > > > >> > > > >> > > > > > > >> > > > >> > > > > > >> > > > >> > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > > > > > > > > > > >