Addressing a few of your questions / concerns: * The scheduler uses a multiprocess queue to queue up tasks, each subprocess is in charge of a single DAG "scheduler cycle" which triggers what it can for active DagRuns. Currently it fills the DagBag from the local file system, looking for a specific module where the master process last saw that DAG. Fetching the DAG is a metadata operation, DAG artifacts shouldn't be too large, we can assume that it takes seconds at most to fetch a DAG, which is ok. We generally assume that the scheduler should fully cycle every minute or so. Version-aware DagFetcher could also implement some sort of caching if that was a concern (shouldn't be though). * For consistency within the whole DagRun, the scheduler absolutely has to read the right version. If tasks got removed they would never get scheduled and consistency cannot be achieved. * TaskInstances get created the first time they are identified as runnable by the scheduler and are born with a queued status I believe (from memory, haven't read the latest code to confirm). The worker double checks and sets it as running as part of a database transaction to avoid double-firing.
Max On Wed, Feb 28, 2018 at 7:29 AM, Chris Palmer <ch...@crpalmer.com> wrote: > I'll preface this with the fact that I'm relatively new to Airflow, and > haven't played around with a lot of the internals. > > I find the idea of a DagFetcher interesting but would we worry about > slowing down the scheduler significantly? If the scheduler is having to > "fetch" multiple different DAG versions, be it git refs or artifacts from > Artifactory, we are talking about adding significant time to each scheduler > run. Also how would the scheduler know which DAGs to fetch from where if > there aren't local files on disk listing those DAGs? Maybe I'm missing > something in the implementation. > > It seems to me that the fetching of the different versions should be > delegated to the Task (or TaskInstance) itself. That ensures we only spend > the time to "fetch" the version that is needed when it is needed. One down > side might be that each TaskInstance running for the same version of the > DAG might end up doing the "fetch" independently (duplicating that work). > > I think this could be done by adding some version attribute to the DagRun > that gets set at creation, and have the scheduler pass that version to the > TaskInstances when they are created. You could even extend this so that you > could have an arbitrary set of "executor_parameters" that get set on a > DagRun and are passed to TaskInstances. Then the specific Executor class > that is running that TaskInstance could handle the "executor_parameters" as > it sees fit. > > One thing I'm not clear on is how and when TaskInstances are created. When > the scheduler first sees a specific DagRun do all the TaskInstances get > created immediately, but only some of them get queued? Or does the > scheduler only create those TaskInstances which can be queued right now? > > In particular if a DagRun gets created and while it is running the DAG is > updated and a new Task is added, will the scheduler pick up that new Task > for the running DagRun? If the answer is yes, then my suggestion above > would run the risk of scheduling a Task for a DAG version where that Task > didn't exist. I'm sure you could handle that somewhat gracefully but it's a > bit ugly. > > Chris > > On Wed, Feb 28, 2018 at 2:05 AM, Maxime Beauchemin < > maximebeauche...@gmail.com> wrote: > > > At a higher level I want to say a few things about the idea of enforcing > > version consistency within a DagRun. > > > > One thing we've been talking about is the need for a "DagFetcher" > > abstraction, where it's first implementation that would replace and mimic > > the current one would be "FileSystemDagFetcher". One specific DagFetcher > > implementation may or may not support version semantics, but if it does > > should be able to receive a version id and return the proper version of > the > > DAG object. For instance that first "FileSystemDagFetcher" would not > > support version semantic, but perhaps a "GitRepoDagFetcher" would, or an > > "ArtifactoryDagFetcher", or "TarballInS3DagFetcher" may as well. > > > > Of course that assumes that the scheduler knows and stores the active > > version number when generating a new DagRun, and for that information to > be > > leveraged on subsequent scheduler cycles and on workers when task are > > executed. > > > > This could also enable things like "remote" backfills (non local, > > parallelized) of a DAG definition that's on an arbitrary git ref > (assuming > > a "GitRepoDagFetcher"). > > > > There are [perhaps] unintuitive implications where clearing a single task > > would then re-run the old DAG definition on that task (since the version > > was stamped in the DagRun and hasn't changed), but deleting/recreating a > > DagRun would run the latest version (or any other version that may be > > specified for that matter). > > > > I'm unclear on how much work that represents exactly, but it's certainly > > doable and may only require to change part of the DagBag class and a few > > other places. > > > > Max > > > > On Tue, Feb 27, 2018 at 6:48 PM, David Capwell <dcapw...@gmail.com> > wrote: > > > > > Thanks for your feedback! > > > > > > Option 1 is a non-starter for us. The reason is we have DAGs that take > 9+ > > > hours to run. > > > > > > Option 2 is more where my mind was going, but it's rather large. How I > > see > > > it you need a MVCC DagBag that's aware of multiple versions (what > > provides > > > version?). Assuming you can track active dag runs pointing to which > > > versions you know how to cleanup (fine with external). The pro here is > > you > > > have snapshot isolation for dag_run, con is more bookkeeping and > require > > > deploy to work with this (last part may be a good thing though). > > > > > > The only other option I can think of is to lock deploy so the system > only > > > picks up new versions when no dag_run holds the lock. This is flawed > for > > > many reasons, but breaks horrible for dag_runs that takes minutes (I > > assume > > > 99% do). > > > > > > > > > > > > On Tue, Feb 27, 2018, 4:50 PM Joy Gao <j...@wepay.com> wrote: > > > > > > > Hi David! > > > > > > > > Thank you for clarifying, I think I understand your concern now. We > > > > currently also work around this by making sure a dag is turned off > > > > when we deploy a new version. We also make sure our jobs are > > > > idempotent and retry-enabled in the case when we forget to turn off > > > > the job, so the issue hasn't caused us too much headache. > > > > > > > > I do agree that it would be nice for Airflow to have the option to > > > > guarantee a single version of dag per dag run. I see two approaches: > > > > > > > > (1) If a dag is updated, the current dagrun fails and/or retries. > > > > (2) If a dag is updated, the current dagrun continues but uses > version > > > > before the update. > > > > > > > > (1) requires some mechanism to compare dag generations. One option is > > > > to hash the dagfile and storing that value to the dagrun table, and > > > > compare against it each time a task is running. And in the case if > the > > > > hash value is different, update the hash value, then fail/retry the > > > > dag. I think this is a fairly safe approach. > > > > > > > > (2) is trickier. A dag only has a property "fileloc" which tracks the > > > > location of the dag file, but the actual content of the dag file is > > > > never versioned. When a task instance starts running, it dynamically > > > > re-processes the dag file specified by the fileloc, generate all the > > > > task objects from the dag file, and fetch the task object by task_id > > > > in order to execute it. So in order to guarantee each dagrun to run a > > > > specific version, previous versions must be maintained on disk > somehow > > > > (maintaining this information in memory is difficult, since if the > > > > scheduler/worker shuts down, that information is lost). This makes it > > > > a pretty big change, and I haven't thought much on how to implement > > > > it. > > > > > > > > I'm personally leaning towards (1) for sake of simplicity. Note that > > > > some users may not want dag to fail/retry even when dag is updated, > so > > > > this should be an optional feature, not required. > > > > > > > > My scheduler-foo isn't that great, so curious what others have to say > > > > about this. > > > > > > > > On Fri, Feb 23, 2018 at 3:12 PM, David Capwell <dcapw...@gmail.com> > > > wrote: > > > > > Thanks for the reply Joy, let me walk you though things as they are > > > today > > > > > > > > > > 1) we don't stop airflow or disable DAGs while deploying updates to > > > > logic, > > > > > this is done live once its released > > > > > 2) the python script in the DAG folder doesn't actually have DAGs > in > > it > > > > but > > > > > is a shim layer to allow us to deploy in a atomic way for a single > > host > > > > > 2.1) this script reads a file on local disk (less than disk page > > > size) > > > > to > > > > > find latest git commit deployed > > > > > 2.2) re-does the airflow DAG load process but pointing to the git > > > > commit > > > > > path > > > > > > > > > > Example directory structure > > > > > > > > > > /airflow/dags/shim.py > > > > > /airflow/real_dags/ > > > > > /latest # pointer to latest commit > > > > > /[git commit]/ > > > > > > > > > > This is how we make sure deploys are consistent within a single > task. > > > > > > > > > > > > > > > Now, lets assume we have a fully atomic commit process and are able > > to > > > > > upgrade DAGs at the exact same moment. > > > > > > > > > > At time T0 the scheduler knows of DAG V1 and schedules two tasks, > > > Task1, > > > > > and Task2 > > > > > At time T1 Task1 is picked up by Worker1, so starts executing the > > task > > > > (V1 > > > > > logic) > > > > > At time T2 deploy commit happens, current DAG version: V2 > > > > > At time T3, Task2 is picked up by Worker2, so starts executing the > > task > > > > (V2 > > > > > logic) > > > > > > > > > > In many cases this isn't really a problem (tuning config change to > > > hadoop > > > > > job), but as we have more people using Airflow this is causing a > lot > > of > > > > > time spent debugging why production acted differently than expected > > > (the > > > > > problem was already fixed... why is it still here?). We also see > > that > > > > some > > > > > tasks expect a given behavior from other tasks, and since they live > > in > > > > the > > > > > same git repo they can modify both tasks at the same time if a > > breaking > > > > > change is needed, but when this rolls out to prod there isn't a way > > to > > > do > > > > > this other than turn off the DAG, and login to all hosts to verify > > > fully > > > > > deployed. > > > > > > > > > > We would like to remove this confusion and make > generations/versions > > > > (same > > > > > thing really) exposed to users and make sure for a single dag_run > > only > > > > one > > > > > version is used. > > > > > > > > > > I hope this is more clear. > > > > > > > > > > On Fri, Feb 23, 2018 at 1:37 PM, Joy Gao <j...@wepay.com> wrote: > > > > > > > > > >> Hi David, > > > > >> > > > > >> Do you mind providing a concrete example of the scenario in which > > > > >> scheduler/workers see different states (I'm not 100% sure if I > > > > understood > > > > >> the issue at hand). > > > > >> > > > > >> And by same dag generation, are you referring to the dag version? > > (DAG > > > > >> version is currently not supported at all, but I can see it being > a > > > > >> building block for future use cases). > > > > >> > > > > >> Joy > > > > >> > > > > >> On Fri, Feb 23, 2018 at 1:00 PM, David Capwell < > dcapw...@gmail.com> > > > > wrote: > > > > >> > > > > >> > My current thinking is to add a field to the dag table that is > > > > optional > > > > >> and > > > > >> > provided by the dag. We currently intercept the load path do > could > > > use > > > > >> this > > > > >> > field to make sure we load the same generation. My concern here > > is > > > > the > > > > >> > interaction with the scheduler, not as familiar with that logic > to > > > > >> predict > > > > >> > corner cases were this would fail. > > > > >> > > > > > >> > Any other recommendations for how this could be done? > > > > >> > > > > > >> > On Mon, Feb 19, 2018, 10:33 PM David Capwell < > dcapw...@gmail.com> > > > > wrote: > > > > >> > > > > > >> > > We have been using airflow for logic that delegates to other > > > > systems so > > > > >> > > inject a task all tasks depends to make sure all resources > used > > > are > > > > the > > > > >> > > same for all tasks in the dag. This works well for tasks that > > > > delegates > > > > >> > to > > > > >> > > external systems but people are starting to need to run logic > in > > > > >> airflow > > > > >> > > and the fact that scheduler and all workers can see different > > > > states is > > > > >> > > causing issues > > > > >> > > > > > > >> > > We can make sure that all the code is deployed in a consistent > > way > > > > but > > > > >> > > need help from the scheduler to tell the workers the current > > > > generation > > > > >> > for > > > > >> > > a DAG. > > > > >> > > > > > > >> > > My question is, what would be the best way to modify airflow > to > > > > allow > > > > >> > DAGs > > > > >> > > to define a generation value that the scheduler could send to > > > > workers? > > > > >> > > > > > > >> > > Thanks > > > > >> > > > > > > >> > > > > > >> > > > > > > > > > > > > > >