Thanks for all the details! With a pluggable fetcher we would be able to add our own logic for how to fetch so sounds like a good place to start for something like this!
On Wed, Feb 28, 2018, 4:39 PM Joy Gao <[email protected]> wrote: > +1 on DagFetcher abstraction, very airflow-esque :) > > On Wed, Feb 28, 2018 at 11:25 AM, Maxime Beauchemin > <[email protected]> wrote: > > Addressing a few of your questions / concerns: > > > > * The scheduler uses a multiprocess queue to queue up tasks, each > > subprocess is in charge of a single DAG "scheduler cycle" which triggers > > what it can for active DagRuns. Currently it fills the DagBag from the > > local file system, looking for a specific module where the master process > > last saw that DAG. Fetching the DAG is a metadata operation, DAG > artifacts > > shouldn't be too large, we can assume that it takes seconds at most to > > fetch a DAG, which is ok. We generally assume that the scheduler should > > fully cycle every minute or so. Version-aware DagFetcher could also > > implement some sort of caching if that was a concern (shouldn't be > though). > > * For consistency within the whole DagRun, the scheduler absolutely has > to > > read the right version. If tasks got removed they would never get > scheduled > > and consistency cannot be achieved. > > * TaskInstances get created the first time they are identified as > runnable > > by the scheduler and are born with a queued status I believe (from > memory, > > haven't read the latest code to confirm). The worker double checks and > sets > > it as running as part of a database transaction to avoid double-firing. > > > > Max > > > > On Wed, Feb 28, 2018 at 7:29 AM, Chris Palmer <[email protected]> > wrote: > > > >> I'll preface this with the fact that I'm relatively new to Airflow, and > >> haven't played around with a lot of the internals. > >> > >> I find the idea of a DagFetcher interesting but would we worry about > >> slowing down the scheduler significantly? If the scheduler is having to > >> "fetch" multiple different DAG versions, be it git refs or artifacts > from > >> Artifactory, we are talking about adding significant time to each > scheduler > >> run. Also how would the scheduler know which DAGs to fetch from where if > >> there aren't local files on disk listing those DAGs? Maybe I'm missing > >> something in the implementation. > >> > >> It seems to me that the fetching of the different versions should be > >> delegated to the Task (or TaskInstance) itself. That ensures we only > spend > >> the time to "fetch" the version that is needed when it is needed. One > down > >> side might be that each TaskInstance running for the same version of the > >> DAG might end up doing the "fetch" independently (duplicating that > work). > >> > >> I think this could be done by adding some version attribute to the > DagRun > >> that gets set at creation, and have the scheduler pass that version to > the > >> TaskInstances when they are created. You could even extend this so that > you > >> could have an arbitrary set of "executor_parameters" that get set on a > >> DagRun and are passed to TaskInstances. Then the specific Executor class > >> that is running that TaskInstance could handle the > "executor_parameters" as > >> it sees fit. > >> > >> One thing I'm not clear on is how and when TaskInstances are created. > When > >> the scheduler first sees a specific DagRun do all the TaskInstances get > >> created immediately, but only some of them get queued? Or does the > >> scheduler only create those TaskInstances which can be queued right now? > >> > >> In particular if a DagRun gets created and while it is running the DAG > is > >> updated and a new Task is added, will the scheduler pick up that new > Task > >> for the running DagRun? If the answer is yes, then my suggestion above > >> would run the risk of scheduling a Task for a DAG version where that > Task > >> didn't exist. I'm sure you could handle that somewhat gracefully but > it's a > >> bit ugly. > >> > >> Chris > >> > >> On Wed, Feb 28, 2018 at 2:05 AM, Maxime Beauchemin < > >> [email protected]> wrote: > >> > >> > At a higher level I want to say a few things about the idea of > enforcing > >> > version consistency within a DagRun. > >> > > >> > One thing we've been talking about is the need for a "DagFetcher" > >> > abstraction, where it's first implementation that would replace and > mimic > >> > the current one would be "FileSystemDagFetcher". One specific > DagFetcher > >> > implementation may or may not support version semantics, but if it > does > >> > should be able to receive a version id and return the proper version > of > >> the > >> > DAG object. For instance that first "FileSystemDagFetcher" would not > >> > support version semantic, but perhaps a "GitRepoDagFetcher" would, or > an > >> > "ArtifactoryDagFetcher", or "TarballInS3DagFetcher" may as well. > >> > > >> > Of course that assumes that the scheduler knows and stores the active > >> > version number when generating a new DagRun, and for that information > to > >> be > >> > leveraged on subsequent scheduler cycles and on workers when task are > >> > executed. > >> > > >> > This could also enable things like "remote" backfills (non local, > >> > parallelized) of a DAG definition that's on an arbitrary git ref > >> (assuming > >> > a "GitRepoDagFetcher"). > >> > > >> > There are [perhaps] unintuitive implications where clearing a single > task > >> > would then re-run the old DAG definition on that task (since the > version > >> > was stamped in the DagRun and hasn't changed), but > deleting/recreating a > >> > DagRun would run the latest version (or any other version that may be > >> > specified for that matter). > >> > > >> > I'm unclear on how much work that represents exactly, but it's > certainly > >> > doable and may only require to change part of the DagBag class and a > few > >> > other places. > >> > > >> > Max > >> > > >> > On Tue, Feb 27, 2018 at 6:48 PM, David Capwell <[email protected]> > >> wrote: > >> > > >> > > Thanks for your feedback! > >> > > > >> > > Option 1 is a non-starter for us. The reason is we have DAGs that > take > >> 9+ > >> > > hours to run. > >> > > > >> > > Option 2 is more where my mind was going, but it's rather large. > How I > >> > see > >> > > it you need a MVCC DagBag that's aware of multiple versions (what > >> > provides > >> > > version?). Assuming you can track active dag runs pointing to which > >> > > versions you know how to cleanup (fine with external). The pro > here is > >> > you > >> > > have snapshot isolation for dag_run, con is more bookkeeping and > >> require > >> > > deploy to work with this (last part may be a good thing though). > >> > > > >> > > The only other option I can think of is to lock deploy so the system > >> only > >> > > picks up new versions when no dag_run holds the lock. This is > flawed > >> for > >> > > many reasons, but breaks horrible for dag_runs that takes minutes (I > >> > assume > >> > > 99% do). > >> > > > >> > > > >> > > > >> > > On Tue, Feb 27, 2018, 4:50 PM Joy Gao <[email protected]> wrote: > >> > > > >> > > > Hi David! > >> > > > > >> > > > Thank you for clarifying, I think I understand your concern now. > We > >> > > > currently also work around this by making sure a dag is turned off > >> > > > when we deploy a new version. We also make sure our jobs are > >> > > > idempotent and retry-enabled in the case when we forget to turn > off > >> > > > the job, so the issue hasn't caused us too much headache. > >> > > > > >> > > > I do agree that it would be nice for Airflow to have the option to > >> > > > guarantee a single version of dag per dag run. I see two > approaches: > >> > > > > >> > > > (1) If a dag is updated, the current dagrun fails and/or retries. > >> > > > (2) If a dag is updated, the current dagrun continues but uses > >> version > >> > > > before the update. > >> > > > > >> > > > (1) requires some mechanism to compare dag generations. One > option is > >> > > > to hash the dagfile and storing that value to the dagrun table, > and > >> > > > compare against it each time a task is running. And in the case if > >> the > >> > > > hash value is different, update the hash value, then fail/retry > the > >> > > > dag. I think this is a fairly safe approach. > >> > > > > >> > > > (2) is trickier. A dag only has a property "fileloc" which tracks > the > >> > > > location of the dag file, but the actual content of the dag file > is > >> > > > never versioned. When a task instance starts running, it > dynamically > >> > > > re-processes the dag file specified by the fileloc, generate all > the > >> > > > task objects from the dag file, and fetch the task object by > task_id > >> > > > in order to execute it. So in order to guarantee each dagrun to > run a > >> > > > specific version, previous versions must be maintained on disk > >> somehow > >> > > > (maintaining this information in memory is difficult, since if the > >> > > > scheduler/worker shuts down, that information is lost). This > makes it > >> > > > a pretty big change, and I haven't thought much on how to > implement > >> > > > it. > >> > > > > >> > > > I'm personally leaning towards (1) for sake of simplicity. Note > that > >> > > > some users may not want dag to fail/retry even when dag is > updated, > >> so > >> > > > this should be an optional feature, not required. > >> > > > > >> > > > My scheduler-foo isn't that great, so curious what others have to > say > >> > > > about this. > >> > > > > >> > > > On Fri, Feb 23, 2018 at 3:12 PM, David Capwell < > [email protected]> > >> > > wrote: > >> > > > > Thanks for the reply Joy, let me walk you though things as they > are > >> > > today > >> > > > > > >> > > > > 1) we don't stop airflow or disable DAGs while deploying > updates to > >> > > > logic, > >> > > > > this is done live once its released > >> > > > > 2) the python script in the DAG folder doesn't actually have > DAGs > >> in > >> > it > >> > > > but > >> > > > > is a shim layer to allow us to deploy in a atomic way for a > single > >> > host > >> > > > > 2.1) this script reads a file on local disk (less than disk > page > >> > > size) > >> > > > to > >> > > > > find latest git commit deployed > >> > > > > 2.2) re-does the airflow DAG load process but pointing to the > git > >> > > > commit > >> > > > > path > >> > > > > > >> > > > > Example directory structure > >> > > > > > >> > > > > /airflow/dags/shim.py > >> > > > > /airflow/real_dags/ > >> > > > > /latest # pointer to latest commit > >> > > > > /[git commit]/ > >> > > > > > >> > > > > This is how we make sure deploys are consistent within a single > >> task. > >> > > > > > >> > > > > > >> > > > > Now, lets assume we have a fully atomic commit process and are > able > >> > to > >> > > > > upgrade DAGs at the exact same moment. > >> > > > > > >> > > > > At time T0 the scheduler knows of DAG V1 and schedules two > tasks, > >> > > Task1, > >> > > > > and Task2 > >> > > > > At time T1 Task1 is picked up by Worker1, so starts executing > the > >> > task > >> > > > (V1 > >> > > > > logic) > >> > > > > At time T2 deploy commit happens, current DAG version: V2 > >> > > > > At time T3, Task2 is picked up by Worker2, so starts executing > the > >> > task > >> > > > (V2 > >> > > > > logic) > >> > > > > > >> > > > > In many cases this isn't really a problem (tuning config change > to > >> > > hadoop > >> > > > > job), but as we have more people using Airflow this is causing a > >> lot > >> > of > >> > > > > time spent debugging why production acted differently than > expected > >> > > (the > >> > > > > problem was already fixed... why is it still here?). We also > see > >> > that > >> > > > some > >> > > > > tasks expect a given behavior from other tasks, and since they > live > >> > in > >> > > > the > >> > > > > same git repo they can modify both tasks at the same time if a > >> > breaking > >> > > > > change is needed, but when this rolls out to prod there isn't a > way > >> > to > >> > > do > >> > > > > this other than turn off the DAG, and login to all hosts to > verify > >> > > fully > >> > > > > deployed. > >> > > > > > >> > > > > We would like to remove this confusion and make > >> generations/versions > >> > > > (same > >> > > > > thing really) exposed to users and make sure for a single > dag_run > >> > only > >> > > > one > >> > > > > version is used. > >> > > > > > >> > > > > I hope this is more clear. > >> > > > > > >> > > > > On Fri, Feb 23, 2018 at 1:37 PM, Joy Gao <[email protected]> > wrote: > >> > > > > > >> > > > >> Hi David, > >> > > > >> > >> > > > >> Do you mind providing a concrete example of the scenario in > which > >> > > > >> scheduler/workers see different states (I'm not 100% sure if I > >> > > > understood > >> > > > >> the issue at hand). > >> > > > >> > >> > > > >> And by same dag generation, are you referring to the dag > version? > >> > (DAG > >> > > > >> version is currently not supported at all, but I can see it > being > >> a > >> > > > >> building block for future use cases). > >> > > > >> > >> > > > >> Joy > >> > > > >> > >> > > > >> On Fri, Feb 23, 2018 at 1:00 PM, David Capwell < > >> [email protected]> > >> > > > wrote: > >> > > > >> > >> > > > >> > My current thinking is to add a field to the dag table that > is > >> > > > optional > >> > > > >> and > >> > > > >> > provided by the dag. We currently intercept the load path do > >> could > >> > > use > >> > > > >> this > >> > > > >> > field to make sure we load the same generation. My concern > here > >> > is > >> > > > the > >> > > > >> > interaction with the scheduler, not as familiar with that > logic > >> to > >> > > > >> predict > >> > > > >> > corner cases were this would fail. > >> > > > >> > > >> > > > >> > Any other recommendations for how this could be done? > >> > > > >> > > >> > > > >> > On Mon, Feb 19, 2018, 10:33 PM David Capwell < > >> [email protected]> > >> > > > wrote: > >> > > > >> > > >> > > > >> > > We have been using airflow for logic that delegates to > other > >> > > > systems so > >> > > > >> > > inject a task all tasks depends to make sure all resources > >> used > >> > > are > >> > > > the > >> > > > >> > > same for all tasks in the dag. This works well for tasks > that > >> > > > delegates > >> > > > >> > to > >> > > > >> > > external systems but people are starting to need to run > logic > >> in > >> > > > >> airflow > >> > > > >> > > and the fact that scheduler and all workers can see > different > >> > > > states is > >> > > > >> > > causing issues > >> > > > >> > > > >> > > > >> > > We can make sure that all the code is deployed in a > consistent > >> > way > >> > > > but > >> > > > >> > > need help from the scheduler to tell the workers the > current > >> > > > generation > >> > > > >> > for > >> > > > >> > > a DAG. > >> > > > >> > > > >> > > > >> > > My question is, what would be the best way to modify > airflow > >> to > >> > > > allow > >> > > > >> > DAGs > >> > > > >> > > to define a generation value that the scheduler could send > to > >> > > > workers? > >> > > > >> > > > >> > > > >> > > Thanks > >> > > > >> > > > >> > > > >> > > >> > > > >> > >> > > > > >> > > > > >> > > > >> > > >> > >
