Addressing a few of your questions / concerns:

* The scheduler uses a multiprocess queue to queue up tasks, each
subprocess is in charge of a single DAG "scheduler cycle" which triggers
what it can for active DagRuns. Currently it fills the DagBag from the
local file system, looking for a specific module where the master process
last saw that DAG. Fetching the DAG is a metadata operation, DAG artifacts
shouldn't be too large, we can assume that it takes seconds at most to
fetch a DAG, which is ok. We generally assume that the scheduler should
fully cycle every minute or so. Version-aware DagFetcher could also
implement some sort of caching if that was a concern (shouldn't be though).
* For consistency within the whole DagRun, the scheduler absolutely has to
read the right version. If tasks got removed they would never get scheduled
and consistency cannot be achieved.
* TaskInstances get created the first time they are identified as runnable
by the scheduler and are born with a queued status I believe (from memory,
haven't read the latest code to confirm). The worker double checks and sets
it as running as part of a database transaction to avoid double-firing.

Max

On Wed, Feb 28, 2018 at 7:29 AM, Chris Palmer <ch...@crpalmer.com> wrote:

> I'll preface this with the fact that I'm relatively new to Airflow, and
> haven't played around with a lot of the internals.
>
> I find the idea of a DagFetcher interesting but would we worry about
> slowing down the scheduler significantly? If the scheduler is having to
> "fetch" multiple different DAG versions, be it git refs or artifacts from
> Artifactory, we are talking about adding significant time to each scheduler
> run. Also how would the scheduler know which DAGs to fetch from where if
> there aren't local files on disk listing those DAGs? Maybe I'm missing
> something in the implementation.
>
> It seems to me that the fetching of the different versions should be
> delegated to the Task (or TaskInstance) itself. That ensures we only spend
> the time to "fetch" the version that is needed when it is needed. One down
> side might be that each TaskInstance running for the same version of the
> DAG might end up doing the "fetch" independently (duplicating that work).
>
> I think this could be done by adding some version attribute to the DagRun
> that gets set at creation, and have the scheduler pass that version to the
> TaskInstances when they are created. You could even extend this so that you
> could have an arbitrary set of "executor_parameters" that get set on a
> DagRun and are passed to TaskInstances. Then the specific Executor class
> that is running that TaskInstance could handle the "executor_parameters" as
> it sees fit.
>
> One thing I'm not clear on is how and when TaskInstances are created. When
> the scheduler first sees a specific DagRun do all the TaskInstances get
> created immediately, but only some of them get queued? Or does the
> scheduler only create those TaskInstances which can be queued right now?
>
> In particular if a DagRun gets created and while it is running the DAG is
> updated and a new Task is added, will the scheduler pick up that new Task
> for the running DagRun? If the answer is yes, then my suggestion above
> would run the risk of scheduling a Task for a DAG version where that Task
> didn't exist. I'm sure you could handle that somewhat gracefully but it's a
> bit ugly.
>
> Chris
>
> On Wed, Feb 28, 2018 at 2:05 AM, Maxime Beauchemin <
> maximebeauche...@gmail.com> wrote:
>
> > At a higher level I want to say a few things about the idea of enforcing
> > version consistency within a DagRun.
> >
> > One thing we've been talking about is the need for a "DagFetcher"
> > abstraction, where it's first implementation that would replace and mimic
> > the current one would be "FileSystemDagFetcher". One specific DagFetcher
> > implementation may or may not support version semantics, but if it does
> > should be able to receive a version id and return the proper version of
> the
> > DAG object. For instance that first "FileSystemDagFetcher" would not
> > support version semantic, but perhaps a "GitRepoDagFetcher" would, or an
> > "ArtifactoryDagFetcher", or "TarballInS3DagFetcher" may as well.
> >
> > Of course that assumes that the scheduler knows and stores the active
> > version number when generating a new DagRun, and for that information to
> be
> > leveraged on subsequent scheduler cycles and on workers when task are
> > executed.
> >
> > This could also enable things like "remote" backfills (non local,
> > parallelized) of a DAG definition that's on an arbitrary git ref
> (assuming
> > a "GitRepoDagFetcher").
> >
> > There are [perhaps] unintuitive implications where clearing a single task
> > would then re-run the old DAG definition on that task (since the version
> > was stamped in the DagRun and hasn't changed), but deleting/recreating a
> > DagRun would run the latest version (or any other version that may be
> > specified for that matter).
> >
> > I'm unclear on how much work that represents exactly, but it's certainly
> > doable and may only require to change part of the DagBag class and a few
> > other places.
> >
> > Max
> >
> > On Tue, Feb 27, 2018 at 6:48 PM, David Capwell <dcapw...@gmail.com>
> wrote:
> >
> > > Thanks for your feedback!
> > >
> > > Option 1 is a non-starter for us. The reason is we have DAGs that take
> 9+
> > > hours to run.
> > >
> > > Option 2 is more where my mind was going, but it's rather large.  How I
> > see
> > > it you need a MVCC DagBag that's aware of multiple versions (what
> > provides
> > > version?).  Assuming you can track active dag runs pointing to which
> > > versions you know how to cleanup (fine with external).  The pro here is
> > you
> > > have snapshot isolation for dag_run, con is more bookkeeping and
> require
> > > deploy to work with this (last part may be a good thing though).
> > >
> > > The only other option I can think of is to lock deploy so the system
> only
> > > picks up new versions when no dag_run holds the lock.  This is flawed
> for
> > > many reasons, but breaks horrible for dag_runs that takes minutes (I
> > assume
> > > 99% do).
> > >
> > >
> > >
> > > On Tue, Feb 27, 2018, 4:50 PM Joy Gao <j...@wepay.com> wrote:
> > >
> > > > Hi David!
> > > >
> > > > Thank you for clarifying, I think I understand your concern now. We
> > > > currently also work around this by making sure a dag is turned off
> > > > when we deploy a new version. We also make sure our jobs are
> > > > idempotent and retry-enabled in the case when we forget to turn off
> > > > the job, so the issue hasn't caused us too much headache.
> > > >
> > > > I do agree that it would be nice for Airflow to have the option to
> > > > guarantee a single version of dag per dag run. I see two approaches:
> > > >
> > > > (1) If a dag is updated, the current dagrun fails and/or retries.
> > > > (2) If a dag is updated, the current dagrun continues but uses
> version
> > > > before the update.
> > > >
> > > > (1) requires some mechanism to compare dag generations. One option is
> > > > to hash the dagfile and storing that value to the dagrun table, and
> > > > compare against it each time a task is running. And in the case if
> the
> > > > hash value is different, update the hash value, then fail/retry the
> > > > dag. I think this is a fairly safe approach.
> > > >
> > > > (2) is trickier. A dag only has a property "fileloc" which tracks the
> > > > location of the dag file, but the actual content of the dag file is
> > > > never versioned. When a task instance starts running, it dynamically
> > > > re-processes the dag file specified by the fileloc, generate all the
> > > > task objects from the dag file, and fetch the task object by task_id
> > > > in order to execute it. So in order to guarantee each dagrun to run a
> > > > specific version, previous versions must be maintained on disk
> somehow
> > > > (maintaining this information in memory is difficult, since if the
> > > > scheduler/worker shuts down, that information is lost). This makes it
> > > > a pretty big change, and I haven't thought much on how to implement
> > > > it.
> > > >
> > > > I'm personally leaning towards (1) for sake of simplicity. Note that
> > > > some users may not want dag to fail/retry even when dag is updated,
> so
> > > > this should be an optional feature, not required.
> > > >
> > > > My scheduler-foo isn't that great, so curious what others have to say
> > > > about this.
> > > >
> > > > On Fri, Feb 23, 2018 at 3:12 PM, David Capwell <dcapw...@gmail.com>
> > > wrote:
> > > > > Thanks for the reply Joy, let me walk you though things as they are
> > > today
> > > > >
> > > > > 1) we don't stop airflow or disable DAGs while deploying updates to
> > > > logic,
> > > > > this is done live once its released
> > > > > 2) the python script in the DAG folder doesn't actually have DAGs
> in
> > it
> > > > but
> > > > > is a shim layer to allow us to deploy in a atomic way for a single
> > host
> > > > >   2.1) this script reads a file on local disk (less than disk page
> > > size)
> > > > to
> > > > > find latest git commit deployed
> > > > >   2.2) re-does the airflow DAG load process but pointing to the git
> > > > commit
> > > > > path
> > > > >
> > > > > Example directory structure
> > > > >
> > > > > /airflow/dags/shim.py
> > > > > /airflow/real_dags/
> > > > >                             /latest # pointer to latest commit
> > > > >                             /[git commit]/
> > > > >
> > > > > This is how we make sure deploys are consistent within a single
> task.
> > > > >
> > > > >
> > > > > Now, lets assume we have a fully atomic commit process and are able
> > to
> > > > > upgrade DAGs at the exact same moment.
> > > > >
> > > > > At time T0 the scheduler knows of DAG V1 and schedules two tasks,
> > > Task1,
> > > > > and Task2
> > > > > At time T1 Task1 is picked up by Worker1, so starts executing the
> > task
> > > > (V1
> > > > > logic)
> > > > > At time T2 deploy commit happens, current DAG version: V2
> > > > > At time T3, Task2 is picked up by Worker2, so starts executing the
> > task
> > > > (V2
> > > > > logic)
> > > > >
> > > > > In many cases this isn't really a problem (tuning config change to
> > > hadoop
> > > > > job), but as we have more people using Airflow this is causing a
> lot
> > of
> > > > > time spent debugging why production acted differently than expected
> > > (the
> > > > > problem was already fixed... why is it still here?).  We also see
> > that
> > > > some
> > > > > tasks expect a given behavior from other tasks, and since they live
> > in
> > > > the
> > > > > same git repo they can modify both tasks at the same time if a
> > breaking
> > > > > change is needed, but when this rolls out to prod there isn't a way
> > to
> > > do
> > > > > this other than turn off the DAG, and login to all hosts to verify
> > > fully
> > > > > deployed.
> > > > >
> > > > > We would like to remove this confusion and make
> generations/versions
> > > > (same
> > > > > thing really) exposed to users and make sure for a single dag_run
> > only
> > > > one
> > > > > version is used.
> > > > >
> > > > > I hope this is more clear.
> > > > >
> > > > > On Fri, Feb 23, 2018 at 1:37 PM, Joy Gao <j...@wepay.com> wrote:
> > > > >
> > > > >> Hi David,
> > > > >>
> > > > >> Do you mind providing a concrete example of the scenario in which
> > > > >> scheduler/workers see different states (I'm not 100% sure if I
> > > > understood
> > > > >> the issue at hand).
> > > > >>
> > > > >> And by same dag generation, are you referring to the dag version?
> > (DAG
> > > > >> version is currently not supported at all, but I can see it being
> a
> > > > >> building block for future use cases).
> > > > >>
> > > > >> Joy
> > > > >>
> > > > >> On Fri, Feb 23, 2018 at 1:00 PM, David Capwell <
> dcapw...@gmail.com>
> > > > wrote:
> > > > >>
> > > > >> > My current thinking is to add a field to the dag table that is
> > > > optional
> > > > >> and
> > > > >> > provided by the dag. We currently intercept the load path do
> could
> > > use
> > > > >> this
> > > > >> > field to make sure we load the same generation.  My concern here
> > is
> > > > the
> > > > >> > interaction with the scheduler, not as familiar with that logic
> to
> > > > >> predict
> > > > >> > corner cases were this would fail.
> > > > >> >
> > > > >> > Any other recommendations for how this could be done?
> > > > >> >
> > > > >> > On Mon, Feb 19, 2018, 10:33 PM David Capwell <
> dcapw...@gmail.com>
> > > > wrote:
> > > > >> >
> > > > >> > > We have been using airflow for logic that delegates to other
> > > > systems so
> > > > >> > > inject a task all tasks depends to make sure all resources
> used
> > > are
> > > > the
> > > > >> > > same for all tasks in the dag. This works well for tasks that
> > > > delegates
> > > > >> > to
> > > > >> > > external systems but people are starting to need to run logic
> in
> > > > >> airflow
> > > > >> > > and the fact that scheduler and all workers can see different
> > > > states is
> > > > >> > > causing issues
> > > > >> > >
> > > > >> > > We can make sure that all the code is deployed in a consistent
> > way
> > > > but
> > > > >> > > need help from the scheduler to tell the workers the current
> > > > generation
> > > > >> > for
> > > > >> > > a DAG.
> > > > >> > >
> > > > >> > > My question is, what would be the best way to modify airflow
> to
> > > > allow
> > > > >> > DAGs
> > > > >> > > to define a generation value that the scheduler could send to
> > > > workers?
> > > > >> > >
> > > > >> > > Thanks
> > > > >> > >
> > > > >> >
> > > > >>
> > > >
> > > >
> > >
> >
>

Reply via email to